diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java new file mode 100644 index 0000000000000..8e5d5b027bec4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; + +import java.io.IOException; +import java.util.zip.DeflaterOutputStream; + +/** + * This class exists to provide a stream with optional compression. This is useful as using compression + * requires that the underlying {@link DeflaterOutputStream} be closed to write EOS bytes. However, the + * {@link BytesStream} should not be closed yet, as we have not used the bytes. This class handles these + * intricacies. + * + * {@link CompressibleBytesOutputStream#materializeBytes()} should be called when all the bytes have been + * written to this stream. If compression is enabled, the proper EOS bytes will be written at that point. + * The underlying {@link BytesReference} will be returned. + * + * {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and + * can be safely released. + */ +final class CompressibleBytesOutputStream extends StreamOutput implements Releasable { + + private final StreamOutput stream; + private final BytesStream bytesStreamOutput; + private final boolean shouldCompress; + + CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { + this.bytesStreamOutput = bytesStreamOutput; + this.shouldCompress = shouldCompress; + if (shouldCompress) { + this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput)); + } else { + this.stream = bytesStreamOutput; + } + } + + /** + * This method ensures that compression is complete and returns the underlying bytes. + * + * @return bytes underlying the stream + * @throws IOException if an exception occurs when writing or flushing + */ + BytesReference materializeBytes() throws IOException { + // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. + // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when + // passed to the deflater stream. + if (shouldCompress) { + stream.close(); + } + + return bytesStreamOutput.bytes(); + } + + @Override + public void writeByte(byte b) throws IOException { + stream.write(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + stream.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void close() { + if (stream == bytesStreamOutput) { + assert shouldCompress == false : "If the streams are the same we should not be compressing"; + IOUtils.closeWhileHandlingException(stream); + } else { + assert shouldCompress : "If the streams are different we should be compressing"; + IOUtils.closeWhileHandlingException(stream, bytesStreamOutput); + } + } + + @Override + public void reset() throws IOException { + stream.reset(); + } +} diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index de083ead10e36..209127b1cddde 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1031,16 +1030,18 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target if (compress) { options = TransportRequestOptions.builder(options).withCompress(true).build(); } + + // only compress if asked and the request is not bytes. Otherwise only + // the header part is compressed, and the "body" can't be extracted as compressed + final boolean compressMessage = options.compress() && canCompress(request); + status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage); boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); try { - // only compress if asked, and, the request is not bytes, since then only - // the header part is compressed, and the "body" can't be extracted as compressed - if (options.compress() && canCompress(request)) { + if (compressMessage) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } // we pick the smallest of the 2, to support both backward and forward compatibility @@ -1051,18 +1052,16 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target stream.setVersion(version); threadPool.getThreadContext().writeTo(stream); stream.writeString(action); - BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); + BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener onRequestSent = new SendListener( - () -> IOUtils.closeWhileHandlingException(finalStream, bStream), + SendListener onRequestSent = new SendListener(stream, () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { if (!addedReleaseListener) { - IOUtils.close(stream, bStream); + IOUtils.close(stream); } } } @@ -1125,27 +1124,25 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress()); boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); try { if (options.compress()) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); - BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); + BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), + SendListener listener = new SendListener(stream, () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { - IOUtils.close(stream, bStream); + IOUtils.close(stream); } } } @@ -1173,8 +1170,8 @@ final BytesReference buildHeader(long requestId, byte status, Version protocolVe /** * Serializes the given message into a bytes representation */ - private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, - ReleasableBytesStreamOutput writtenBytes) throws IOException { + private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, + CompressibleBytesOutputStream stream) throws IOException { final BytesReference zeroCopyBuffer; if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead BytesTransportRequest bRequest = (BytesTransportRequest) message; @@ -1185,12 +1182,12 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer message.writeTo(stream); zeroCopyBuffer = BytesArray.EMPTY; } - // we have to close the stream here - flush is not enough since we might be compressing the content - // and if we do that the close method will write some marker bytes (EOS marker) and otherwise - // we barf on the decompressing end when we read past EOF on purpose in the #validateRequest method. - // this might be a problem in deflate after all but it's important to close it for now. - stream.close(); - final BytesReference messageBody = writtenBytes.bytes(); + // we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream + // might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker) + // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the + // #validateRequest method. this might be a problem in deflate after all but it's important to write + // the marker bytes. + final BytesReference messageBody = stream.materializeBytes(); final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length()); return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } diff --git a/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java new file mode 100644 index 0000000000000..721f53f10f9cb --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.EOFException; +import java.io.IOException; + +public class CompressibleBytesOutputStreamTests extends ESTestCase { + + public void testStreamWithoutCompression() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + BytesReference bytesRef = stream.materializeBytes(); + + assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + + StreamInput streamInput = bytesRef.streamInput(); + byte[] actualBytes = new byte[expectedBytes.length]; + streamInput.readBytes(actualBytes, 0, expectedBytes.length); + + assertEquals(-1, streamInput.read()); + assertArrayEquals(expectedBytes, actualBytes); + stream.close(); + + // The bytes should be zeroed out on close + for (byte b : bytesRef.toBytesRef().bytes) { + assertEquals((byte) 0, b); + } + } + + public void testStreamWithCompression() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + BytesReference bytesRef = stream.materializeBytes(); + + assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + + StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bytesRef.streamInput()); + byte[] actualBytes = new byte[expectedBytes.length]; + streamInput.readBytes(actualBytes, 0, expectedBytes.length); + + assertEquals(-1, streamInput.read()); + assertArrayEquals(expectedBytes, actualBytes); + stream.close(); + + // The bytes should be zeroed out on close + for (byte b : bytesRef.toBytesRef().bytes) { + assertEquals((byte) 0, b); + } + } + + public void testCompressionWithCallingMaterializeFails() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + + StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput()); + byte[] actualBytes = new byte[expectedBytes.length]; + EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); + assertEquals("Unexpected end of ZLIB input stream", e.getMessage()); + + stream.close(); + } + + private static byte[] randomBytes(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < bytes.length; ++i) { + bytes[i] = randomByte(); + } + return bytes; + } + + private static class ZeroOutOnCloseStream extends BytesStreamOutput { + + @Override + public void close() { + int size = (int) bytes.size(); + bytes.set(0, new byte[size], 0, size); + } + } +}