diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 13ee257ba25..c5c6b89b197 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting multipartUpload{ + this, false, "multipart-upload", "whether to use multi-part uploads"}; const Setting bufferSize{ this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"}; @@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore static std::shared_ptr executor = std::make_shared(maxThreads); - std::call_once(transferManagerCreated, [&]() { + std::call_once(transferManagerCreated, [&]() + { + if (multipartUpload) { + TransferManagerConfiguration transferConfig(executor.get()); + + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; + + transferConfig.uploadProgressCallback = + [](const TransferManager *transferManager, + const std::shared_ptr + &transferHandle) + { + //FIXME: find a way to properly abort the multipart upload. + //checkInterrupt(); + debug("upload progress ('%s'): '%d' of '%d' bytes", + transferHandle->GetKey(), + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferManager = TransferManager::Create(transferConfig); + } + }); - TransferManagerConfiguration transferConfig(executor.get()); + auto now1 = std::chrono::steady_clock::now(); - transferConfig.s3Client = s3Helper.client; - transferConfig.bufferSize = bufferSize; + if (transferManager) { - transferConfig.uploadProgressCallback = - [](const TransferManager *transferManager, - const std::shared_ptr - &transferHandle) - { - //FIXME: find a way to properly abort the multipart upload. - //checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - transferHandle->GetKey(), - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); - }; + std::shared_ptr transferHandle = + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map(), + nullptr, contentEncoding); - transferManager = TransferManager::Create(transferConfig); - }); + transferHandle->WaitUntilFinished(); - auto now1 = std::chrono::steady_clock::now(); + if (transferHandle->GetStatus() == TransferStatus::FAILED) + throw Error("AWS error: failed to upload 's3://%s/%s': %s", + bucketName, path, transferHandle->GetLastError().GetMessage()); - std::shared_ptr transferHandle = - transferManager->UploadFile( - stream, bucketName, path, mimeType, - Aws::Map(), - nullptr, contentEncoding); + if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", + bucketName, path); - transferHandle->WaitUntilFinished(); + } else { - if (transferHandle->GetStatus() == TransferStatus::FAILED) - throw Error("AWS error: failed to upload 's3://%s/%s': %s", - bucketName, path, transferHandle->GetLastError().GetMessage()); + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - if (transferHandle->GetStatus() != TransferStatus::COMPLETED) - throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", - bucketName, path); + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + + auto stream = std::make_shared(data); + + request.SetBody(stream); + + auto result = checkAws(fmt("AWS error uploading '%s'", path), + s3Helper.client->PutObject(request)); + } printTalkative("upload of '%s' completed", path);