New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18410. S3AInputStream.unbuffer() not releasing http connections #4766
HADOOP-18410. S3AInputStream.unbuffer() not releasing http connections #4766
Conversation
The async stream draining only executes (in the other thread) if the original invoker waits for the result. this is not caused by some synchronized conflict -I made the method being invoked static to ensure this. And the logging of the start/finish of the call are present. just not the bit in the middle. test run with the {{join()}} to make it blocking.the That would appear to leave some aspect of CompletableFuture, possibly related to how things are being wrapped in duration tracking, audit spans etc. But I can't see this, and they all seem to work everywhere else.
no waitingBut without the join, no joy. even though the log above shows the drain is being executed in the new thread.
then the timeout surfaces on the next read()
|
…p connections Lots more logging at debug, more comments, *no idea why async drain doesn't work* also, adaptive changed the stream to go from sequential to random on unbuffer(), as it is clear the caller is doing clever things. Change-Id: I4ea2d902e5ae1c0db630091eed55e5916db97534
5f8fcf6
to
f199851
Compare
the key cause is that even though the fields passed in to drain() were converted to references through the methods, in the lambda expression passed in to submit, they are direct references ``` operation = client.submit( () -> drain(uri, streamStatistics, false, reason, remaining, object, wrappedStream)); /* here */ ``` the fields are only read during the async execution, not during the submit phase. the next step the code does is reset those fields; the async work is failing with an NPE which isn't being noted. Adding the join() after submit works *because this was inserted before the fields were set to null...so no NPE being silently raised. Change-Id: I417108db0daa78f711eb8dc1ed96ef619437a2bf
HADOOP-18410. copy fields to variables before use in a lambda expression the key cause is that even though the fields passed in to drain() were
the fields are only read during the async execution, not during the submit phase. the next step the code does is reset those fields; the async work is Adding the join() after submit works *because this was inserted before |
tested: s3 london |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reviewed myself; needs an iteration
@@ -604,7 +604,7 @@ public synchronized void close() throws IOException { | |||
try { | |||
stopVectoredIOOperations.set(true); | |||
// close or abort the stream; blocking | |||
awaitFuture(closeStream("close() operation", false, true)); | |||
closeStream("close() operation", false, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because this is blocking here's no need for that await future, but i think i will reinstate it for safety
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
Outdated
Show resolved
Hide resolved
as this draining code is used in prefetch too, i'm going to
the isolation lets me add unit tests for its failure cases
|
Pulls out draining code into its own class SDKStreamDrainer. 1. This is a CallableRaisingIOE so can be passed into submit without any lambda expression wrapping. 2. Used in normal and prefetching streams. 3. Has unit tests of failure modes. Change-Id: Id476f9029613c24b1070c3645ce84643b7705ed7
the last patch factors out stream draining; adds test for corner cases, especially escalation from read to abort. used in classic and prefetching streams, with unit tests. one long-standing aspect of this design is that read(buffer) will swallow any IOE raised on any read() to fill the buffer, other than the first read(). so if a socket exception is raised partway through the read, it won't get noticed on that call, but only the subsequent one. need to modify the test to mimic this |
…ead(buf) better. Any IOE in read() is only thrown if it is when reading the first byte of the buffer. Change-Id: I53fba27bfacd105f4c3ea4128202a3215456c001
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
I was wondering, outside the scope of this PR, if we can also provide S3AInputStream API to retrieve the lastAccessedTimestamp for |
if there already is a field there, yes. i've been wondering if there was a way to do some retirement of long-lived input streams. it doesn't matter so much with vectored io or prefetch as both will be doing shorter lived IO |
Hmm might need some sort of "self expiring cache" kind of thing |
either the fs tracks all open streams (weak ref map) and scans them, or each stream schedules a worker to run every few minutes which will release the stream if idle. fs is probably simpler, at least in terms of scheduling, thread shutdown etc |
need some reviews here. this is a critical bug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1, really nice test coverage.
throw new IllegalStateException( | ||
"duplicate invocation of drain operation"); | ||
} | ||
boolean executeAbort = shouldAbort; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why creating a new temp variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldAbort is a final arg; executeAbort will be set to true if draining doesn't work
* @return the drainer. | ||
*/ | ||
private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) { | ||
Assertions.assertThat(drainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertions.assertThat(drainer.isAborted())
.isTrue();
Why not use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because on an assertion failure, we get drainer.toString() in the generated message.
still trying to find the best way to use assertj for simple true/false settings. this is a bit overcomplex, but it should be the most informative on a failure
private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) { | ||
Assertions.assertThat(drainer) | ||
.matches(SDKStreamDrainer::isAborted, "isAborted"); | ||
return drainer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return value is never used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not yet...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, left few minor comments
return false; | ||
} catch (Exception e) { | ||
// exception escalates to an abort | ||
LOG.debug("When closing {} stream for {}, will abort the stream", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are changing the behaviour by aborting the stream, shall we have this log at WARN
level at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't want logs full of warnings as they create issues of their own. seen it too many times.
|
||
streamStatistics.streamClose(true, remaining); | ||
LOG.debug("Stream {} {}: {}; remaining={}", | ||
uri, (shouldAbort ? "aborted" : "closed"), reason, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(shouldAbort ? "aborted" : "closed")
can be replaced with just aborted
? If the stream is closed, we would not reach here.
LOG.debug("Switching to Random IO seek policy after unbuffer() invoked"); | ||
setInputPolicy(S3AInputPolicy.Random); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about?
final S3AInputPolicy newPolicy = S3AInputPolicy.Random;
LOG.debug("Switching to {} policy after unbuffer() invoked", newPolicy);
setInputPolicy(newPolicy);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public FileSystem getBrittleFS() { | ||
return brittleFS; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: given that brittleFS
usages are private, perhaps we don't need this getter method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's there. not going to revert it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG, some minor comments and suggestions. Really liked the tests.
@@ -184,12 +184,17 @@ | |||
<value>true</value> | |||
</property> | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra blank line
...ols/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
Outdated
Show resolved
Hide resolved
describe("Starting read/unbuffer #%d", i); | ||
in.read(); | ||
in.unbuffer(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can assert the number of aborts collected in IOStats after the for loop StreamStatisticNames.STREAM_READ_ABORTED
to be 10 in this test and 0 in the above test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, plus asserts on the fs to verify propagation (and find bugs where they don't)
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java
Show resolved
Hide resolved
...ols/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
Outdated
Show resolved
Hide resolved
* some of the feedback * verify seek policy changes/doesn't change on unbuffer as appropriate. * tests assert on iostats of stream and fs * which identified that the FS wasn't counting unbuffer events. fixed Change-Id: Ia4b9182cf0d61078085ea0551b3293a5ed86bbc5
Change-Id: I3745e86eccf4c51d6f35fdce5dbca75cc40e9036
tested, s3 london |
Change-Id: If2499c6d47a2c5502f2f963ad57f638334336b04
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
...of import ordering and changes to S3ARemoteObject Change-Id: Ieb2c1a11e6893fa57099e320eab5ef352c6fdcf1
🎊 +1 overall
This message was automatically generated. |
ok, i'm merging this; mukund's vote, mehakmeets comments and yetus are all happy |
…ions (apache#4766) HADOOP-16202 "Enhance openFile()" added asynchronous draining of the remaining bytes of an S3 HTTP input stream for those operations (unbuffer, seek) where it could avoid blocking the active thread. This patch fixes the asynchronous stream draining to work and so return the stream back to the http pool. Without this, whenever unbuffer() or seek() was called on a stream and an asynchronous drain triggered, the connection was not returned; eventually the pool would be empty and subsequent S3 requests would fail with the message "Timeout waiting for connection from pool" The root cause was that even though the fields passed in to drain() were converted to references through the methods, in the lambda expression passed in to submit, they were direct references operation = client.submit( () -> drain(uri, streamStatistics, false, reason, remaining, object, wrappedStream)); /* here */ Those fields were only read during the async execution, at which point they would have been set to null (or even a subsequent read). A new SDKStreamDrainer class peforms the draining; this is a Callable and can be submitted directly to the executor pool. The class is used in both the classic and prefetching s3a input streams. Also, calling unbuffer() switches the S3AInputStream from adaptive to random IO mode; that is, it is considered a cue that future IO will not be sequential, whole-file reads. Contributed by Steve Loughran. Change-Id: Ia43339302dbe837ceee4bcfc83fd9624b3c4992c
…ions (#4766) HADOOP-16202 "Enhance openFile()" added asynchronous draining of the remaining bytes of an S3 HTTP input stream for those operations (unbuffer, seek) where it could avoid blocking the active thread. This patch fixes the asynchronous stream draining to work and so return the stream back to the http pool. Without this, whenever unbuffer() or seek() was called on a stream and an asynchronous drain triggered, the connection was not returned; eventually the pool would be empty and subsequent S3 requests would fail with the message "Timeout waiting for connection from pool" The root cause was that even though the fields passed in to drain() were converted to references through the methods, in the lambda expression passed in to submit, they were direct references operation = client.submit( () -> drain(uri, streamStatistics, false, reason, remaining, object, wrappedStream)); /* here */ Those fields were only read during the async execution, at which point they would have been set to null (or even a subsequent read). A new SDKStreamDrainer class peforms the draining; this is a Callable and can be submitted directly to the executor pool. The class is used in both the classic and prefetching s3a input streams. Also, calling unbuffer() switches the S3AInputStream from adaptive to random IO mode; that is, it is considered a cue that future IO will not be sequential, whole-file reads. Contributed by Steve Loughran.
…ions (apache#4766) HADOOP-16202 "Enhance openFile()" added asynchronous draining of the remaining bytes of an S3 HTTP input stream for those operations (unbuffer, seek) where it could avoid blocking the active thread. This patch fixes the asynchronous stream draining to work and so return the stream back to the http pool. Without this, whenever unbuffer() or seek() was called on a stream and an asynchronous drain triggered, the connection was not returned; eventually the pool would be empty and subsequent S3 requests would fail with the message "Timeout waiting for connection from pool" The root cause was that even though the fields passed in to drain() were converted to references through the methods, in the lambda expression passed in to submit, they were direct references operation = client.submit( () -> drain(uri, streamStatistics, false, reason, remaining, object, wrappedStream)); /* here */ Those fields were only read during the async execution, at which point they would have been set to null (or even a subsequent read). A new SDKStreamDrainer class peforms the draining; this is a Callable and can be submitted directly to the executor pool. The class is used in both the classic and prefetching s3a input streams. Also, calling unbuffer() switches the S3AInputStream from adaptive to random IO mode; that is, it is considered a cue that future IO will not be sequential, whole-file reads. Contributed by Steve Loughran.
…ions -prefetch changes(#4766) Changes in HADOOP-18410 which are needed for the S3A prefetching stream; needed as part of the HADOOP-18703 backport Change-Id: Ib403ca793e29a4416e5d892f9081de5832da3b68
…ions -prefetch changes(#4766) Changes in HADOOP-18410 which are needed for the S3A prefetching stream; needed as part of the HADOOP-18703 backport Change-Id: Ib403ca793e29a4416e5d892f9081de5832da3b68
Lots more logging at debug, more comments, no idea why async drain
doesn't work
also, adaptive changed the stream to go from sequential to random on unbuffer(),
as it is clear the caller is doing clever things. Still doesn't make things
work.
update second patch fixes it. race condition in when the values of fields were evaluated.
How was this patch tested?
staring at test results all afternoon commenting lines on and off.
no regression testing of existing tests.
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?