-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-13560 block output streams #130
HADOOP-13560 block output streams #130
Conversation
d8679cf
to
d483376
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Steve, thank you for the new revision. I have a few more small comments, entered on specific lines.
if (!state.equals(Closed)) { | ||
try { | ||
enterState(null, Closed); | ||
} catch (IllegalStateException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this can't throw the exception unless we have a bug in our code. Is it better to let the IllegalStateException
be thrown so that we see that sooner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it can't happen, but like to close off all failure routes of a close() call. + I think it may have dated from when some IOE was thrown. Anyway, throwing again.
* @throws IOException IOE raised on FileOutputStream | ||
*/ | ||
@Override | ||
void flush() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call super.flush()
to trigger the validation check for Writing
state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
dataSize = buffer.size(); | ||
ByteArrayInputStream bufferData = new ByteArrayInputStream( | ||
buffer.toByteArray()); | ||
buffer.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking you could remove the buffer.reset()
, because the next line is dropping the reference to buffer
anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -1250,6 +1569,144 @@ can be used: | |||
Using the explicit endpoint for the region is recommended for speed and the | |||
ability to use the V4 signing API. | |||
|
|||
|
|||
## "Timeout waiting for connection from pool" when writing to S3A |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried an mvn site
build, and it looks like the new troubleshooting sections still aren't nested correctly. I believe it should be ###
instead of ##
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did that everywhere; updated. Also in the troubleshooting s3a memory, just pointed back to the thread tuning entry.
this.progressListener = (progress instanceof ProgressListener) ? | ||
(ProgressListener) progress | ||
: new ProgressableListener(progress); | ||
LOG.debug("Initialized S3ABlockOutputStream for {}" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think activeBlock
is always null
when this log statement executes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct! swapped order of log and action
@@ -1093,12 +1101,48 @@ | |||
</property> | |||
|
|||
<property> | |||
<name>fs.s3a.fast.upload</name> | |||
<name>fs.s3a.block.output</name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this revision missing the changes to restore/un-deprecate fs.s3a.fast.upload
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afraid so, that bit had missed the push as I forgot to --force the patch up at the end of the day. Will push it up with all the comments here, after another test run
3f3baaf
to
aa49f2c
Compare
*/ | ||
static abstract class DataBlock implements Closeable { | ||
|
||
private volatile DestState state = Writing; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm nitpicking here, but wouldn't it make more sense to define DestState here instead of on line 272? Moving that line here would improve code readability imo but wouldn't change any behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* @param key S3 object to work on. | ||
* @param executorService the executor service to use to schedule work | ||
* @param progress report progress in order to prevent timeouts. If | ||
* this class implements {@code ProgressListener} then it will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is passed an object, not a class. You probably meant "If this object implements ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct. your diligence in reading javadocs is appreciated
* @return the active block; null if there isn't one. | ||
* @throws IOException on any failure to create | ||
*/ | ||
private synchronized S3ADataBlocks.DataBlock maybeCreateBlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lazy creation in this method is nice, but the "maybe" in its name gives a false impression of arbitrariness involved. "createBlockIfNeeded" might be a better naming option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
c26877e
to
d6f8202
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (non-binding) based on review. Testing is ongoing, we'll report our findings.
|
||
The total number of threads performing work across all threads is set by | ||
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued | ||
work items. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The total max block (memory/disk) consumption, across all streams, is bounded byfs.s3a.multipart.size * ( fs.s3a.fast.upload.active.blocks + fs.s3a.max.total.tasks + 1)
bytes for an instance of S3AFileSystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you know, now that you can have a queue per stream, it could be set to something
bigger. This is something we could look at in the docs, leaving out of the XML so as
to have a single topic. This phrase here describes the number of active threads, which
is different —and will be more so once there's other work (COPY, DELETE) going on there.
So: wont change here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completely agree. A bit further down I propose to add a single explanation in the javadoc and link to there in the various other locations
// Trigger an upload then process the remainder. | ||
LOG.debug("writing more data than block has capacity -triggering upload"); | ||
uploadCurrentBlock(); | ||
// tail recursion is mildly expensive, but given buffer sizes must be MB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI Up to 10k. That's AWS's limit on the number of parts in a single multipartupload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I've set that limit in Constants and will log @ error if the #of blocks exceeds it. We'll see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can . With the min part size of 5MB you need a 50GB upload to test this. Will take a while vs. AWS. We can test this cheaply, but of course vs our S3-clone, but at least that will test the log @ error.
@pieterreuse please add this to our testplan
* This was taken from {@code S3AFastOutputStream} and has the | ||
* same problem which surfaced there: it consumes heap space | ||
* proportional to the mismatch between writes to the stream and | ||
* the JVM-wide upload bandwidth to the S3 endpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but bounded by ...
the amount of memory requested for each container. | ||
|
||
The slower the write bandwidth to S3, the greater the risk of running out | ||
of memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory usage is bounded to ...
|
||
The total number of threads performing work across all threads is set by | ||
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued | ||
work items. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idem as in pom.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..again, not changing it in either place, as once renames() parallelize, life gets more complex
|
||
The amount of data which can be buffered is limited by the available | ||
size of the JVM heap heap. The slower the write bandwidth to S3, the greater | ||
the risk of heap overflows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding link to the s3a_fast_upload_thread_tuning section
|
||
``` | ||
#### <a name="s3a_fast_upload_thread_tuning"></a>S3A Fast Upload Thread Tuning | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a (probably better) alternative to my other comments, we could explain the bound on the memory consumption here once and link to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep.
|
||
These charges can be reduced by enabling `fs.s3a.multipart.purge`, | ||
and setting a purge time in seconds, such as 86400 seconds —24 hours, after | ||
which the S3 service automatically deletes outstanding multipart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the wording here gives the impression this is a server-side operation but the purging happens on the client by listing all uploads and then sending a delete call with the ones to be purged. Consequently, this can cause a (slight) delay when instantiating an s3a FS instance and there are lots of active uploads (to purge).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it? Never knew that. I'd thought it was server side. Will change. Also, we could make that an async operation; it's not needed to bring up the FS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The grunt work is done in com.amazonaws.services.s3.transfer.TransferManager#abortMultipartUploads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yes making async is again a very good idea here.
…directly interacted with the s3 client into a new inner class of S3AFilesSystem, WriteOperationState. This cleanly separates interaction between the output stream —buffering of data and queuing of uploads— from the upload process itself. I think S3Guard may be able to do something with this, but I also hope to use it as a start for async directory list/delete operations; this class would track create-time probes, and initiate the async deletion of directory objects after a successful write. That's why there are separate callbacks for writeSuccessful and writeFailed...we will only want to spawn off the deletion when the write succeeded. In the process of coding all this, managed to break multipart uploads: this has led to a clearer understanding of how part uploads fail, an improvement in statistics collection and in the test. Otherwise, * trying to get the imports in sync with branch-2; IDE somehow rearranged things. * docs in more detail
… configurable test timeout in maven, pre-flight validation of timeout in big files (and a suggestion of a new timeout size to use); bandwidth stats printed on intermediate writes and on upload callbacks, so helping differentiate buffer write and upload speeds, and give someone logging the files something interesting to look at.
…tive block; remove unimplemented (and hard to implement meaningfully) bandwidth gauge; diff against branch-2 to reduce delta as much as possible (IDE import changes)
…ng and (b) WARNing.
…block output stream. This makes it consistent with its (now deleted) predecessor; that is un-deprecated, with all configuration options changed to use fast.upload in their names; FAST_UPLOAD in their fieldnames. I've tried to document all this, and add a new section on tuning queue sizes.
…provements to the docs
* mark some package scoped/inner classes as final * chop down lines where appropriate * rename some variables, and even when private final, wrap access from subclasses in accessors. (needless, IMO) Not done, hence checkstyle will still be complaining about. I don't intend to address these. * chop javadoc lines with link/crossref entries > 80 chars * use of tests named test_040_PositionedReadHugeFile(), public void test_050_readHugeFile() in AbstractSTestS3AHugeFiles. This class has a test runner which runs the tests in alphabetical order; they must run in sequence. The naming scheme is designed to achieve this, and to highlight that the numbering scheme here is special. * use of _1MB and _1KB constants. They're sizes, I like them like that.
static { | ||
Configuration.addDeprecations(new Configuration.DeprecationDelta[]{ | ||
new Configuration.DeprecationDelta("fs.s3a.threads.core", | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with DeprecationDelta's, but this null value gave rise to a nullpointerexception on all unit tests when fs.s3a.threads.core was in my config. Replacing this null with "" (empty string) resolved my issue, but I'm not 100% sure that is the right thing to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just cut that section entirely. That's harsh, but, well, it the fast output stream was always marked as experimental ... we've learned from the experiment and are now changing behaviour here, which is something we can look at covering in the release notes. I'll add that to the JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That indeed fixes the problems I had, thx for looking into this.
d6f8202
to
726876d
Compare
@steveloughran I'm trying this as part of 3.0.0-alpha2 (it's exactly what I was looking for after running into the same OOM problems) and wondering when it cleans up the disk-cached blocks. I'm generating a ~50GB file on an instance with ~6GB free when the process starts. My expectation is that local copies of the blocks would be deleted after those parts finish uploading, but I'm seeing more than 15 blocks in I can't confirm that any parts have finished uploading, though I suspect they have. I see that |
Can you comment on that in a JIRA, not a PR? Thanks |
That's https://issues.apache.org/jira/secure/Dashboard.jspa ; project HADOOP, component fs/s3 / They should be deleted as soon as the upload completes; the |
Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com> Reviewers: Yi Pan <nickpan47@gmail.com> Closes apache#130 from shanthoosh/master
Merge commit of latest code.
Docs, XML configs up to speed
scale tests only run with a -Pscale option.
some props can be configured in POM