Fix IndexerWorkerClient#fetchChannelData when response has data and error.#15084
Fix IndexerWorkerClient#fetchChannelData when response has data and error.#15084cryptoe merged 2 commits intoapache:masterfrom
Conversation
…rror. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
| if (backpressureFuture != null) { | ||
| clientResponseObj.setBackpressureFuture(backpressureFuture); | ||
| } | ||
| clientResponseObj.addBytesRead(chunkSize); |
There was a problem hiding this comment.
Nit: I would expect to add bytes read at the end before returning the clientResponseObject.
More of a readability thing.
There was a problem hiding this comment.
Moved to the line before the return.
| } | ||
| catch (Exception e) { | ||
| clientResponseObj.exceptionCaught(e); | ||
| } |
There was a problem hiding this comment.
Nit: toSkip> chunk size since we have already have that data in the channel, so we basically do nothing. Can we add an explicit else and mention that in a comment.
There was a problem hiding this comment.
The else would have en empty body, so instead of adding that, I just added a comment next to the else if.
| if (backpressureFuture != null) { | ||
| clientResponseObj.setBackpressureFuture(backpressureFuture); | ||
| } | ||
| clientResponseObj.addBytesRead(chunkSize); |
There was a problem hiding this comment.
In the skip case we are not basically reading all the bytes. We are skipping to chunkSize - (int) toSkip]. So does adding in all the bytes is valid?
There was a problem hiding this comment.
It is, since "bytes read" tracks how many bytes have been read from the response, not how many have been written to the channel. Its purpose is to let us know when to stop skipping (we keep incrementing it until it passes channel.getBytesAdded() - startOffset). I added a comment about this.
|
Thanks @gianm for the clarifying comments. !! |
…rror. (apache#15084) * Fix IndexerWorkerClient#fetchChannelData when response has data and error. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
…rror. (apache#15084) * Fix IndexerWorkerClient#fetchChannelData when response has data and error. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.