Skip to content

Commit

Permalink
feat: Transfer Manager ParallelCompositeUploads (#2494)
Browse files Browse the repository at this point in the history
  • Loading branch information
sydney-munro committed Apr 11, 2024
1 parent d47afcf commit 8b54549
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@
final class DefaultQos implements Qos {

private final long divideAndConquerThreshold;
private final long parallelCompositeUploadThreshold;
private boolean threadThresholdMet;

private DefaultQos(long divideAndConquerThreshold) {
private DefaultQos(
long divideAndConquerThreshold,
long parallelCompositeUploadThreshold,
boolean threadThresholdMet) {
this.divideAndConquerThreshold = divideAndConquerThreshold;
this.parallelCompositeUploadThreshold = parallelCompositeUploadThreshold;
this.threadThresholdMet = threadThresholdMet;
}

@Override
public boolean divideAndConquer(long objectSize) {
return objectSize > divideAndConquerThreshold;
}

static DefaultQos of() {
return of(128L * 1024 * 1024);
@Override
public boolean parallelCompositeUpload(long objectSize) {
return threadThresholdMet && objectSize > parallelCompositeUploadThreshold;
}

static DefaultQos of(long divideAndConquerThreshold) {
return new DefaultQos(divideAndConquerThreshold);
static DefaultQos of(TransferManagerConfig config) {
return new DefaultQos(
128L * 1024 * 1024, 4L * config.getPerWorkerBufferSize(), config.getMaxWorkers() > 2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.storage.transfermanager;

import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobWriteSession;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class ParallelCompositeUploadCallable implements Callable<UploadResult> {
private final Storage storage;

private final BlobInfo originalBlob;

private final Path sourceFile;

private final ParallelUploadConfig parallelUploadConfig;

private final Storage.BlobWriteOption[] opts;

public ParallelCompositeUploadCallable(
Storage storage,
BlobInfo originalBlob,
Path sourceFile,
ParallelUploadConfig parallelUploadConfig,
BlobWriteOption[] opts) {
this.storage = storage;
this.originalBlob = originalBlob;
this.sourceFile = sourceFile;
this.parallelUploadConfig = parallelUploadConfig;
this.opts = opts;
}

public UploadResult call() {
return uploadPCU();
}

private UploadResult uploadPCU() {
BlobWriteSession session = storage.blobWriteSession(originalBlob, opts);
try (WritableByteChannel writableByteChannel = session.open();
FileChannel fc = FileChannel.open(sourceFile, StandardOpenOption.READ)) {
ByteStreams.copy(fc, writableByteChannel);
} catch (StorageException e) {
if (parallelUploadConfig.isSkipIfExists() && e.getCode() == 412) {
return UploadResult.newBuilder(originalBlob, TransferStatus.SKIPPED)
.setException(e)
.build();
} else {
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
}
} catch (Exception e) {
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
}
try {
ApiFuture<BlobInfo> result = session.getResult();
BlobInfo newBlob = result.get(10, TimeUnit.SECONDS);
return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setUploadedBlob(newBlob)
.build();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@
interface Qos {

boolean divideAndConquer(long objectSize);

boolean parallelCompositeUpload(long objectSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.BetaApi;
import com.google.cloud.storage.BlobInfo;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -58,7 +59,7 @@ public interface TransferManager extends AutoCloseable {
*/
@BetaApi
@NonNull
UploadJob uploadFiles(List<Path> files, ParallelUploadConfig config);
UploadJob uploadFiles(List<Path> files, ParallelUploadConfig config) throws IOException;

/**
* Downloads a list of blobs in parallel. This operation will not block the invoking thread,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,18 @@ public final class TransferManagerConfig {
private final boolean allowParallelCompositeUpload;

private final StorageOptions storageOptions;
private final Qos qos;

TransferManagerConfig(
int maxWorkers,
int perWorkerBufferSize,
boolean allowDivideAndConquerDownload,
boolean allowParallelCompositeUpload,
StorageOptions storageOptions,
Qos qos) {
StorageOptions storageOptions) {
this.maxWorkers = maxWorkers;
this.perWorkerBufferSize = perWorkerBufferSize;
this.allowDivideAndConquerDownload = allowDivideAndConquerDownload;
this.allowParallelCompositeUpload = allowParallelCompositeUpload;
this.storageOptions = storageOptions;
this.qos = qos;
}

/**
Expand Down Expand Up @@ -86,6 +83,15 @@ public boolean isAllowDivideAndConquerDownload() {
* chunking will be beneficial
*
* @see Builder#setAllowParallelCompositeUpload(boolean)
* <p>Note: Performing parallel composite uploads costs more money. <a
* href="https://cloud.google.com/storage/pricing#operations-by-class">Class A</a> operations
* are performed to create each part and to perform each compose. If a storage tier other than
* <a href="https://cloud.google.com/storage/docs/storage-classes"><code>STANDARD</code></a>
* is used, early deletion fees apply to deletion of the parts.
* <p>Please see the <a
* href="https://cloud.google.com/storage/docs/parallel-composite-uploads">Parallel composite
* uploads</a> documentation for a more in depth explanation of the limitations of Parallel
* composite uploads.
*/
@BetaApi
public boolean isAllowParallelCompositeUpload() {
Expand All @@ -105,7 +111,7 @@ public StorageOptions getStorageOptions() {
/** The service object for {@link TransferManager} */
@BetaApi
public TransferManager getService() {
return new TransferManagerImpl(this);
return new TransferManagerImpl(this, DefaultQos.of(this));
}

@BetaApi
Expand All @@ -115,14 +121,9 @@ public Builder toBuilder() {
.setAllowParallelCompositeUpload(allowParallelCompositeUpload)
.setMaxWorkers(maxWorkers)
.setPerWorkerBufferSize(perWorkerBufferSize)
.setQos(qos)
.setStorageOptions(storageOptions);
}

Qos getQos() {
return qos;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -179,15 +180,13 @@ public static class Builder {
private boolean allowParallelCompositeUpload;

private StorageOptions storageOptions;
private Qos qos;

private Builder() {
this.perWorkerBufferSize = 16 * 1024 * 1024;
this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors();
this.allowDivideAndConquerDownload = false;
this.allowParallelCompositeUpload = false;
this.storageOptions = StorageOptions.getDefaultInstance();
this.qos = DefaultQos.of();
}

/**
Expand Down Expand Up @@ -244,8 +243,8 @@ public Builder setAllowDivideAndConquerDownload(boolean allowDivideAndConquerDow
* @see TransferManagerConfig#isAllowDivideAndConquerDownload()
*/
@BetaApi
public Builder setAllowParallelCompositeUpload(boolean allowDivideAndConquerDownload) {
this.allowDivideAndConquerDownload = allowDivideAndConquerDownload;
public Builder setAllowParallelCompositeUpload(boolean allowParallelCompositeUpload) {
this.allowParallelCompositeUpload = allowParallelCompositeUpload;
return this;
}

Expand All @@ -263,12 +262,6 @@ public Builder setStorageOptions(StorageOptions storageOptions) {
return this;
}

@BetaApi
Builder setQos(Qos qos) {
this.qos = qos;
return this;
}

/**
* Creates a TransferManagerConfig object.
*
Expand All @@ -281,8 +274,7 @@ public TransferManagerConfig build() {
perWorkerBufferSize,
allowDivideAndConquerDownload,
allowParallelCompositeUpload,
storageOptions,
qos);
storageOptions);
}
}
}

0 comments on commit 8b54549

Please sign in to comment.