Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-16193. add extra S3A MPU test to see what happens if a file is created during the MPU #609

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,20 +17,31 @@
*/
package org.apache.hadoop.fs.contract.s3a;

import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;

import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;

/**
* Test MultipartUploader with S3A.
Expand Down Expand Up @@ -159,4 +170,47 @@ public void testDirectoryInTheWay() throws Exception {
public void testMultipartUploadReverseOrder() throws Exception {
ContractTestUtils.skip("skipped for speed");
}

/**
* This creates and then deletes a zero-byte file while an upload
* is in progress, and verifies that the uploaded file is ultimately
* visible.
*/
@Test
public void testMultipartOverlapWithTransientFile() throws Throwable {
// until there's a way to explicitly ask for a multipart uploader from a
// specific FS, explicitly create one bonded to the raw FS.
describe("testMultipartOverlapWithTransientFile");
S3AFileSystem fs = getFileSystem();
Path path = path("testMultipartOverlapWithTransientFile");
fs.delete(path, true);
MultipartUploader mpu = mpu(1);
UploadHandle upload1 = mpu.initialize(path);
byte[] dataset = dataset(1024, '0', 10);
final Map<Integer, PartHandle> handles = new HashMap<>();
LOG.info("Uploading multipart entry");
PartHandle value = mpu.putPart(path, new ByteArrayInputStream(dataset), 1,
upload1,
dataset.length);
// upload 1K
handles.put(1, value);
// confirm the path is absent
ContractTestUtils.assertPathDoesNotExist(fs,
"path being uploaded", path);
// now create an empty file
ContractTestUtils.touch(fs, path);
final FileStatus touchStatus = fs.getFileStatus(path);
LOG.info("0-byte file has been created: {}", touchStatus);
fs.delete(path, false);
// now complete the upload
mpu.complete(path, handles, upload1);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we can assert about the PathHandle returned here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that it's not empty?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That you can use it to open the file again and read it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be able to use a PathHandle but these aren't, are they? They're PathHandles: Serializable representations of an uploaded file part.

Copy link

@ehiggs ehiggs Apr 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean mpu.complete returns a PathHandle. The part handles are PartHandles. Indeed PartHandles aren't useful out of the context of putPart (intentionally). But PathHandle is indeed useful and can be used to open the file to make sure it's the same file.

We use the path later to get the mpuStatus, so maybe this is just moot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I fully understand pathhandles. I thought that was an HDFS thing only right, or is there something obvious I haven't noticed?

Pauses, looks at code. I see: S3A MPU returns the etag, which isn't enough to refer to it when reopening.

We've been doing work on S3 & versioning now: one thing I'd like to get back from the upload is a reference which can be guaranteed to open that specific file, or failfast if it has changed; something like (path, etag, version). Returning these immediately from the upload in an S3AFileStatus would be one approach, but a pathhandle for S3A which contained all of those would be exactly what someone needs, isn't it?

Do you fancy getting busy on that?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing I'd like to get back from the upload is a reference which can be guaranteed to open that specific file

That does sound better. We should make a ticket as soon as apache jira is back online.


// wait for the data to arrive
eventually(timeToBecomeConsistentMillis(), 500, () -> {
FileStatus mpuStatus = fs.getFileStatus(path);
assertTrue("File is empty in " + mpuStatus, mpuStatus.getLen() > 0);
return mpuStatus;
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
});

}
}