-
Notifications
You must be signed in to change notification settings - Fork 24.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add CompressibleBytesOutputStream for compression #24927
Conversation
I was particularly interested in feedback on this question: Should the CompressibleBytesOutputStream class live in:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good left some suggestions
|
||
import java.io.IOException; | ||
|
||
public class CompressibleBytesOutputStream extends StreamOutput implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets keep this class here and make it pkg private and final. it's very special
* @return bytes underlying the stream | ||
* @throws IOException if an exception occurs when writing or flushing | ||
*/ | ||
public BytesStream finishStream() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just call this public BytesReference materializeAndClose() throws IOException
and don't even return the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about:
public BytesReference materialize() throws IOException
or
public BytesReference materializeBytes() throws IOException
I just kind of feel weird about materializeAndClose()
because calling that does not release you from the obligation that you still must call the regular close()
.
|
||
import java.io.IOException; | ||
|
||
public class CompressibleBytesOutputStream extends StreamOutput implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a simple unittest for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some nit picky things, mostly because it is fun to have opinions. Feel free to ignore.
this.bytesStreamOutput = bytesStreamOutput; | ||
this.shouldCompress = shouldCompress; | ||
if (shouldCompress) { | ||
this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I don't like mixing this.stream
with stream
. It makes me think you are shadowing and need this.
sometimes. No big deal, but it slows down reading to have to follow that little mental dead end.
|
||
@Override | ||
public void close() { | ||
// If we are not using compression stream == bytesStreamOutput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it'd actually be clearer not to have shouldCompress
and instead check for reference equality here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ also add an assertion that check that shouldCompress
has the corresponding value?
boolean addedReleaseListener = false; | ||
StreamOutput stream = Streams.flushOnCloseStream(bStream); | ||
try { | ||
// only compress if asked, and, the request is not bytes, since then only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably should move this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice improvement! left a few comments
|
||
import java.io.IOException; | ||
|
||
final class CompressibleBytesOutputStream extends StreamOutput implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a class level javadoc about the class and why/what it is used for
|
||
@Override | ||
public void close() { | ||
if (stream == bytesStreamOutput) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use shouldCompress
here to make it a little clearer?
if (shouldCompress) {
IOUtils.closeWhileHandlingException(stream, bytesStreamOutput);
} else {
assert stream == bytesStreamOutput : "the stream variable is not the same instance as bytesStreamOutput";
IOUtils.closeWhileHandlingException(stream);
}
|
||
// only compress if asked and the request is not bytes. Otherwise only | ||
// the header part is compressed, and the "body" can't be extracted as compressed | ||
boolean compressMessage = options.compress() && canCompress(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this final?
StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput()); | ||
byte[] actualBytes = new byte[expectedBytes.length]; | ||
streamInput.readBytes(actualBytes, 0, expectedBytes.length); | ||
fail("Expected to receive EOFException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use expectThrows
?
// this might be a problem in deflate after all but it's important to close it for now. | ||
stream.close(); | ||
final BytesReference messageBody = writtenBytes.bytes(); | ||
// we have to call finishStream here before accessing the bytes. A CompressibleBytesOutputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/finishStream/materializeBytes
stream.close(); | ||
final BytesReference messageBody = writtenBytes.bytes(); | ||
// we have to call finishStream here before accessing the bytes. A CompressibleBytesOutputStream | ||
// might be implementing compression. And finish stream ensures that some marker bytes (EOS marker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/finish stream/materialize bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@s1monw Are you good with these changes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a suggestion otehrwise LGTM nice simplification
stream.close(); | ||
} | ||
|
||
return bytesStreamOutput.bytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also close the bytesStreamOutput
after this? or can we simply call close() before it seems like it's safe for the BytesStreamOutput#close()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. The bytesStreamOutput is still being used at that point. That's why this is separating calling the compressing stream and the bytes stream. Must close the compressing stream to write EOS bytes. But wait until we are done using the underlying bytes to close the byte stream.
Alright once ci finishes I will merge this. I'm not sure if you all noticed, but
I left it on Simon's suggestion which kind of seemed like a compromise between the differing suggestions. |
This is related to elastic#24927. There was a small possibility that a test was attempting to compress a stream with zero bytes. This was causing a failure. This test now requires at least one byte.
This is related to #24927. There was a small possibility that a test was attempting to compress a stream with zero bytes. This was causing a failure. This test now requires at least one byte.
This is a follow-up to elastic#23941. Currently there are a number of complexities related to compression. The raw DeflaterOutputStream must be closed prior to sending bytes to ensure that EOS bytes are written. But the underlying ReleasableBytesStreamOutput cannot be closed until the bytes are sent to ensure that the bytes are not reused. Right now we have three different stream references hanging around in TCPTransport to handle this complexity. This commit introduces CompressibleBytesOutputStream to be one stream implemenation that will behave properly with or without compression enabled.
This is related to #24927. There was a small possibility that a test was attempting to compress a stream with zero bytes. This was causing a failure. This test now requires at least one byte.
This is a follow-up to #23941. Currently there are a number of
complexities related to compression. The raw
DeflaterOutputStream
mustbe closed prior to sending bytes to ensure that EOS bytes are written.
But the underlying
ReleasableBytesStreamOutput
cannot be closed untilthe bytes are sent to ensure that the bytes are not reused.
Right now we have three different stream references hanging around in
TCPTransport
to handle this complexity. This commit introducesCompressibleBytesOutputStream
to be one stream implemenation that willbehave properly with or without compression enabled.