Skip to content

Commit

Permalink
Merge pull request #11988 from jpountz/fix/zlib_skip_underlying
Browse files Browse the repository at this point in the history
Transport: Do not make the buffer skip while a stream is open.
  • Loading branch information
jpountz committed Jul 2, 2015
2 parents 6bf543d + 7482e13 commit 6765635
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 48 deletions.
Expand Up @@ -85,13 +85,12 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);

long requestId = buffer.readLong();
byte status = buffer.readByte();
Version version = Version.fromId(buffer.readInt());

StreamInput wrappedStream = null;
boolean success = false;
try {
long requestId = streamIn.readLong();
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());

if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
Compressor compressor;
try {
Expand All @@ -106,52 +105,40 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
sb.append("]");
throw new IllegalStateException(sb.toString());
}
wrappedStream = compressor.streamInput(streamIn);
} else {
wrappedStream = streamIn;
streamIn = compressor.streamInput(streamIn);
}
wrappedStream.setVersion(version);
streamIn.setVersion(version);

if (TransportStatus.isRequest(status)) {
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
boolean success = false;
try {
final int nextByte = wrappedStream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
if (buffer.readerIndex() < expectedIndexReader) {
throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedIndexReader) {
throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
success = true;
} finally {
if (!success) {
buffer.readerIndex(expectedIndexReader);
}
String action = handleRequest(ctx.getChannel(), streamIn, requestId, version);

// Chek the entire message has been read
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
if (buffer.readerIndex() < expectedIndexReader) {
throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedIndexReader) {
throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}

} else {
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
handlerResponseError(streamIn, handler);
} else {
handleResponse(ctx.getChannel(), wrappedStream, handler);
handleResponse(ctx.getChannel(), streamIn, handler);
}
} else {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
}

boolean success = false;
try {
final int nextByte = wrappedStream.read();
// Chek the entire message has been read
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
Expand All @@ -164,15 +151,20 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
}
success = true;
} finally {
if (!success) {
buffer.readerIndex(expectedIndexReader);
}

}
}
} finally {
IOUtils.close(wrappedStream);
try {
if (success) {
IOUtils.close(streamIn);
} else {
IOUtils.closeWhileHandlingException(streamIn);
}
} finally {
// Set the expected position of the buffer, no matter what happened
buffer.readerIndex(expectedIndexReader);
}
}
}

Expand Down
Expand Up @@ -575,7 +575,6 @@ public void handleException(TransportException exp) {

@Test
@TestLogging(value = "test. transport.tracer:TRACE")
@AwaitsFix(bugUrl = "@spinscale is looking into failures: http://build-us-00.elastic.co/job/es_core_master_strong/3986/")
public void testTracerLog() throws InterruptedException {
TransportRequestHandler handler = new TransportRequestHandler<StringMessageRequest>() {
@Override
Expand Down

0 comments on commit 6765635

Please sign in to comment.