Skip to content

Commit

Permalink
Use the same transfer manager throughout.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Jun 24, 2016
1 parent 57b94db commit e10e3ed
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 14 deletions.
18 changes: 6 additions & 12 deletions underfs/s3a/src/main/java/alluxio/underfs/s3a/S3AOutputStream.java
Expand Up @@ -54,9 +54,6 @@ public class S3AOutputStream extends OutputStream {
/** The local file that will be uploaded when the stream is closed. */ /** The local file that will be uploaded when the stream is closed. */
private final File mFile; private final File mFile;


/** The AmazonS3 client for S3 operations. */
private final AmazonS3 mClient;

/** Flag to indicate this stream has been closed, to ensure close is only done once. */ /** Flag to indicate this stream has been closed, to ensure close is only done once. */
private final AtomicBoolean mClosed = new AtomicBoolean(false); private final AtomicBoolean mClosed = new AtomicBoolean(false);


Expand All @@ -68,7 +65,7 @@ public class S3AOutputStream extends OutputStream {
* It is recommended (http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html) * It is recommended (http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html)
* to upload file larger than 100MB using Multipart Uploads. * to upload file larger than 100MB using Multipart Uploads.
*/ */
private final TransferManager mTransferManager; private final TransferManager mManager;


/** The output stream to a local file where the file will be buffered until closed. */ /** The output stream to a local file where the file will be buffered until closed. */
private OutputStream mLocalOutputStream; private OutputStream mLocalOutputStream;
Expand All @@ -81,17 +78,17 @@ public class S3AOutputStream extends OutputStream {
* *
* @param bucketName the name of the bucket * @param bucketName the name of the bucket
* @param key the key of the file * @param key the key of the file
* @param client the JetS3t client * @param manager the transfer manager to upload the file with
* @throws IOException when a non-Alluxio related error occurs * @throws IOException when a non-Alluxio related error occurs
*/ */
public S3AOutputStream(String bucketName, String key, AmazonS3 client) throws IOException { public S3AOutputStream(String bucketName, String key, TransferManager manager)
throws IOException {
Preconditions.checkArgument(bucketName != null && !bucketName.isEmpty(), "Bucket name must " Preconditions.checkArgument(bucketName != null && !bucketName.isEmpty(), "Bucket name must "
+ "not be null or empty."); + "not be null or empty.");
mBucketName = bucketName; mBucketName = bucketName;
mKey = key; mKey = key;
mClient = client; mManager = manager;
mFile = new File(PathUtils.concatPath("/tmp", UUID.randomUUID())); mFile = new File(PathUtils.concatPath("/tmp", UUID.randomUUID()));
mTransferManager = new TransferManager(mClient);
try { try {
mHash = MessageDigest.getInstance("MD5"); mHash = MessageDigest.getInstance("MD5");
mLocalOutputStream = mLocalOutputStream =
Expand Down Expand Up @@ -135,11 +132,8 @@ public void close() throws IOException {
if (mHash != null) { if (mHash != null) {
meta.setContentMD5(new String(Base64.encode(mHash.digest()))); meta.setContentMD5(new String(Base64.encode(mHash.digest())));
} }

PutObjectRequest putReq = new PutObjectRequest(mBucketName, mKey, mFile).withMetadata(meta); PutObjectRequest putReq = new PutObjectRequest(mBucketName, mKey, mFile).withMetadata(meta);

mManager.upload(putReq).waitForUploadResult();
mTransferManager.upload(putReq).waitForUploadResult();

if (!mFile.delete()) { if (!mFile.delete()) {
LOG.error("Failed to delete temporary file @ {}", mFile.getPath()); LOG.error("Failed to delete temporary file @ {}", mFile.getPath());
} }
Expand Down
Expand Up @@ -83,7 +83,7 @@ public class S3AUnderFileSystem extends UnderFileSystem {
/** /**
* Constructs a new instance of {@link S3AUnderFileSystem}. * Constructs a new instance of {@link S3AUnderFileSystem}.
* *
* @param uri the {@link alluxio.AlluxioURI} for this UFS * @param uri the {@link AlluxioURI} for this UFS
* @param conf the configuration for Alluxio * @param conf the configuration for Alluxio
*/ */
public S3AUnderFileSystem(AlluxioURI uri, Configuration conf) { public S3AUnderFileSystem(AlluxioURI uri, Configuration conf) {
Expand Down Expand Up @@ -124,7 +124,7 @@ public void connectFromWorker(Configuration conf, String hostname) {
@Override @Override
public OutputStream create(String path) throws IOException { public OutputStream create(String path) throws IOException {
if (mkdirs(getParentKey(path), true)) { if (mkdirs(getParentKey(path), true)) {
return new S3AOutputStream(mBucketName, stripPrefixIfPresent(path), mClient); return new S3AOutputStream(mBucketName, stripPrefixIfPresent(path), mManager);
} }
return null; return null;
} }
Expand Down

0 comments on commit e10e3ed

Please sign in to comment.