Skip to content

Commit

Permalink
S3BinaryCacheStore: Allow disabling multipart uploads
Browse files Browse the repository at this point in the history
The use of TransferManager has several issues, including that it
doesn't allow setting a Content-Encoding without a patch, and it
doesn't handle exceptions in worker threads (causing termination on
memory allocation failure).

Fixes #2493.
  • Loading branch information
edolstra committed Oct 30, 2018
1 parent 0163e89 commit 9f99d62
Showing 1 changed file with 57 additions and 31 deletions.
88 changes: 57 additions & 31 deletions src/libstore/s3-binary-cache-store.cc
Expand Up @@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
const Setting<bool> multipartUpload{
this, false, "multipart-upload", "whether to use multi-part uploads"};
const Setting<uint64_t> bufferSize{
this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};

Expand Down Expand Up @@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);

std::call_once(transferManagerCreated, [&]() {
std::call_once(transferManagerCreated, [&]()
{
if (multipartUpload) {
TransferManagerConfiguration transferConfig(executor.get());

transferConfig.s3Client = s3Helper.client;
transferConfig.bufferSize = bufferSize;

transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager,
const std::shared_ptr<const TransferHandle>
&transferHandle)
{
//FIXME: find a way to properly abort the multipart upload.
//checkInterrupt();
debug("upload progress ('%s'): '%d' of '%d' bytes",
transferHandle->GetKey(),
transferHandle->GetBytesTransferred(),
transferHandle->GetBytesTotalSize());
};

transferManager = TransferManager::Create(transferConfig);
}
});

TransferManagerConfiguration transferConfig(executor.get());
auto now1 = std::chrono::steady_clock::now();

transferConfig.s3Client = s3Helper.client;
transferConfig.bufferSize = bufferSize;
if (transferManager) {

transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager,
const std::shared_ptr<const TransferHandle>
&transferHandle)
{
//FIXME: find a way to properly abort the multipart upload.
//checkInterrupt();
debug("upload progress ('%s'): '%d' of '%d' bytes",
transferHandle->GetKey(),
transferHandle->GetBytesTransferred(),
transferHandle->GetBytesTotalSize());
};
std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile(
stream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(),
nullptr, contentEncoding);

transferManager = TransferManager::Create(transferConfig);
});
transferHandle->WaitUntilFinished();

auto now1 = std::chrono::steady_clock::now();
if (transferHandle->GetStatus() == TransferStatus::FAILED)
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
bucketName, path, transferHandle->GetLastError().GetMessage());

std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile(
stream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(),
nullptr, contentEncoding);
if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
bucketName, path);

transferHandle->WaitUntilFinished();
} else {

if (transferHandle->GetStatus() == TransferStatus::FAILED)
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
bucketName, path, transferHandle->GetLastError().GetMessage());
auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);

if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
bucketName, path);
request.SetContentType(mimeType);

if (contentEncoding != "")
request.SetContentEncoding(contentEncoding);

auto stream = std::make_shared<istringstream_nocopy>(data);

request.SetBody(stream);

auto result = checkAws(fmt("AWS error uploading '%s'", path),
s3Helper.client->PutObject(request));
}

printTalkative("upload of '%s' completed", path);

Expand Down

0 comments on commit 9f99d62

Please sign in to comment.