-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18637:S3A to support upload of files greater than 2 GB using DiskBlocks #5481
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments.
Have you run all the types of buffer tests?
Added tests for setting and not setting the new configuration and validating the behavior.
when not set, upload should happen via multipart else a single put.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Outdated
Show resolved
Hide resolved
@@ -595,7 +596,7 @@ public void initialize(URI name, Configuration originalConf) | |||
} | |||
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, | |||
DEFAULT_FAST_UPLOAD_BUFFER); | |||
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); | |||
//partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cut
@@ -1831,6 +1832,11 @@ private FSDataOutputStream innerCreateFile( | |||
final PutObjectOptions putOptions = | |||
new PutObjectOptions(keep, null, options.getHeaders()); | |||
|
|||
if(!checkDiskBuffer(getConf())){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.
This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran
public static boolean checkDiskBuffer(Configuration conf){ | ||
boolean isAllowedMultipart = conf.getBoolean(ALLOW_MULTIPART_UPLOADS, | ||
IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT); | ||
if (isAllowedMultipart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrong here I guess.
if isAllowedMultipart is enabled then FAST_UPLOAD_BUFFER must be disk else we throw an error right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If multipart is disabled and the FAST_UPLOAD_BUFFER is not disk then we throw an error.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
Outdated
Show resolved
Hide resolved
@steveloughran could you review this please. thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did a review.
add a test in ITestS3AConfiguration
to verify that a forbidden config (multipart off and disk buffering) raises an exception
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
Outdated
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java
Outdated
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java
Outdated
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java
Outdated
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java
Outdated
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final tuning
* be used as the file size might be bigger than the buffer size that can be | ||
* allocated. | ||
* @param conf | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: document conf argument and retrun value
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Show resolved
Hide resolved
@@ -1859,7 +1859,7 @@ private FSDataOutputStream innerCreateFile( | |||
.withPutOptions(putOptions) | |||
.withIOStatisticsAggregator( | |||
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) | |||
.withMultipartAllowed(getConf().getBoolean( | |||
.withMultipartEnabled(getConf().getBoolean( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the multipart enabled flag should be made a field and stored during initialize(), so we can save on scanning the conf map every time a file is created.
//First one being the creation of test/ directory marker | ||
//Second being the creation of the file with tests3ascale/<file-name> | ||
//Third being the creation of directory marker tests3ascale/ on the file delete | ||
assertEquals(3L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use IOStatisticAssertions
here; it generates AssertJ assertion chains from lookups with automatic generation of error text.
assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
.isEqualTo(3);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. added some minor comments.
Seems like these points still need to be addressed as discussed before.
- Error in staging committer based on new config.
- Error in magic committer based on new config.
- Error in write operations helper based on new config.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor comments.
now, what about multipart uploads (as you mentioned to me offline)
- the request factory changes guarantee it won't work, but it would be good to have fail faster.
- s3afs.createMultipartUploader() should fail the way it does with isCSEEnabled; add a test to verify this.
other than that, all looks great!
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
Outdated
Show resolved
Hide resolved
protected Configuration createScaleConfiguration() { | ||
Configuration configuration = super.createScaleConfiguration(); | ||
configuration.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); | ||
configuration.setLong(MULTIPART_SIZE, 53687091200L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this some special value? if so: make a constant, explain what it is.
...aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost ready to go. Some minor tuning.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
Outdated
Show resolved
Hide resolved
...aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java
Outdated
Show resolved
Hide resolved
...aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java
Outdated
Show resolved
Hide resolved
Path commitPath = getFileSystem().makeQualified( | ||
new Path(getContract().getTestPath(), "/testpath")); | ||
LOG.debug("{}", commitPath); | ||
assertThrows(PathCommitException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same intercept.
@@ -1831,6 +1832,11 @@ private FSDataOutputStream innerCreateFile( | |||
final PutObjectOptions putOptions = | |||
new PutObjectOptions(keep, null, options.getHeaders()); | |||
|
|||
if(!checkDiskBuffer(getConf())){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.
This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you run all aws integration tests @HarshitGupta11 , lot of test failing.
@@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) | |||
*/ | |||
@Retries.RetryTranslated | |||
private void initMultipartUpload() throws IOException { | |||
Preconditions.checkState(!isMultipartUploadEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran aws test suite on default settings and seeing alot of failures.
Some notable ones:
[ERROR] testAbortAfterTwoPartUpload(org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits) Time elapsed: 9.331 s <<< FAILURE!
java.lang.AssertionError: upload must not have completed: unexpectedly found s3a://mehakmeet-singh-data/fork-0001/test/testAbortAfterTwoPartUpload as S3AFileStatus{path=s3a://mehakmeet-singh-data/fork-0001/test/testAbortAfterTwoPartUpload; isDirectory=false; length=5242880; replication=1; blocksize=33554432; modification_time=1681199546000; access_time=0; owner=mehakmeet.singh; group=mehakmeet.singh; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=true; isErasureCoded=false} isEmptyDirectory=FALSE eTag=7afe7425a06fe7b3eec28c310e4b5a7e versionId=null
at org.junit.Assert.fail(Assert.java:89)
at org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist(ContractTestUtils.java:1018)
at org.apache.hadoop.fs.contract.AbstractFSContractTestBase.assertPathDoesNotExist(AbstractFSContractTestBase.java:330)
at org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.testAbortAfterTwoPartUpload(ITestS3AMultipartUploadSizeLimits.java:158)
This one I'm a little skeptical could be my own machine/config issue so would like others to run this once(tried to run alone still fails):
[ERROR] Failures:
[ERROR] ITestS3ACommitterMRJob.test_200_execute:295->Assert.fail:89 Job job_1681203442293_0003 failed in state FAILED with cause Task failed task_1681203442293_0003_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0
.
Consult logs under /Users/mehakmeet.singh/workstation/osource/hadoop-trunk-review/hadoop/hadoop-tools/hadoop-aws/target/test/data/yarn-2023-04-11-14.26.54.78/yarn-2011546580
This one Mukund has highlighted already:
[ERROR] testCommitterWithDuplicatedCommit(org.apache.hadoop.fs.s3a.commit.magic.ITestMagicCommitProtocol) Time elapsed: 6.838 s <<< ERROR!
java.lang.IllegalStateException: multipart upload is disabled
at org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:269)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.initMultipartUpload(S3ABlockOutputStream.java:376)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:209)
[ERROR] testReplaceWithDeleteFailure(org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedJobCommit) Time elapsed: 3.09 s <<< ERROR!
org.apache.hadoop.fs.s3a.commit.PathCommitException: `s3a://bucket-name/output/path': Multipart uploads are disabled for the FileSystem, the committer can't proceed.
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.<init>(AbstractS3ACommitter.java:221)
There are few more failures but best to run the test suite and debug the cause from there
@@ -217,6 +217,10 @@ protected AbstractS3ACommitter( | |||
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", | |||
role, jobName(context), jobIdString(context), outputPath); | |||
S3AFileSystem fs = getDestS3AFS(); | |||
if (!fs.isMultipartUploadEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we want to fail for any s3a committer initialization if the multipart is disabled? iirc magic committer does require multipart but should we be failing for others as well? CC @steveloughran
also seems like alot of tests are failing when I run the suite on default props(by default this should be true and not fail here) could be due to UTs using "MockS3AFileSystem" which doesn't actually initialize and set the variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they all use multiparts as that is how they write-but-don't-commit the data. this is something harshit and I worked on
@@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) | |||
*/ | |||
@Retries.RetryTranslated | |||
private void initMultipartUpload() throws IOException { | |||
Preconditions.checkState(!isMultipartUploadEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor tunings for the production code, a bit of the testing too
@@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, | |||
*/ | |||
private ArnResource accessPoint; | |||
|
|||
/** | |||
* Is this S3A FS instance has multipart uploads enabled? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar nit
"is multipart upload enabled?"
@@ -1854,7 +1863,8 @@ private FSDataOutputStream innerCreateFile( | |||
.withCSEEnabled(isCSEEnabled) | |||
.withPutOptions(putOptions) | |||
.withIOStatisticsAggregator( | |||
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); | |||
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) | |||
.withMultipartEnabled(isMultipartUploadEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, indentation. should be aligned with the .with
above
MULTIPART_UPLOAD_ENABLED_DEFAULT); | ||
if (isMultipartUploadEnabled) { | ||
return true; | ||
} else if (!isMultipartUploadEnabled && conf.get(FAST_UPLOAD_BUFFER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified to
return isMultipartUploadEnabled
|| FAST_UPLOAD_BUFFER_DISK.equals(conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
that default in conf.get is critical to prevent NPEs if the option is unset, moving the constant first even more rigorous
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
Show resolved
Hide resolved
@Nullable final PutObjectOptions options) { | ||
@Nullable final PutObjectOptions options) throws IOException { | ||
if (!isMultipartUploadEnabled) { | ||
throw new IOException("Multipart uploads are disabled on the given filesystem."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make a PathIOException and include destkey. This gives a bit more detail.
throw new PathIOException(destKey, "Multipart uploads are disabled");
...aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java
Show resolved
Hide resolved
...a/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
added a change in #5543 to pull in here. now, what do we do for large file renames. currently the transfer manager using that part size to trigger use of MPUs in renames; it doesn't use our request factory so it won't surface. we could add a modified auditor which would trigger an exception on any MPU initialisation POST, then make sure the huge file renames don't trigger it... |
or just bypass the xfer manager entirely in this world and do a single copy request? |
Description of PR
Use S3A Diskblocks to support the upload of files greater than 2 GB using DiskBlocks. Currently, the max upload size of a single block is ~2GB.
How was this patch tested?
The patch was tested against us-west-2
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?