From 67a7c6b6582b5afab9272b32e01897d009e7c0bf Mon Sep 17 00:00:00 2001 From: Sydney Munro <97561403+sydney-munro@users.noreply.github.com> Date: Mon, 6 May 2024 12:00:07 -0700 Subject: [PATCH] fix: ParallelCompositeUpload in Transfer Manager hangs when encountering OOM (#2526) * fix: ParallelCompositeUpload in Transfer Manager hangs when encountering OOM * lint * include missing import --- .../transfermanager/TransferManagerImpl.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 4bbbeb939..5489c1c89 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobWriteSessionConfigs; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; @@ -83,7 +84,11 @@ final class TransferManagerImpl implements TransferManager { if (transferManagerConfig.isAllowParallelCompositeUpload()) { ParallelCompositeUploadBlobWriteSessionConfig pcuConfig = BlobWriteSessionConfigs.parallelCompositeUpload() - .withExecutorSupplier(ExecutorSupplier.useExecutor(executor)); + .withExecutorSupplier(ExecutorSupplier.useExecutor(executor)) + .withBufferAllocationStrategy( + BufferAllocationStrategy.fixedPool( + transferManagerConfig.getMaxWorkers(), + transferManagerConfig.getPerWorkerBufferSize())); storageOptions = storageOptions.toBuilder().setBlobWriteSessionConfig(pcuConfig).build(); } this.pcuQueue = new ConcurrentLinkedDeque<>(); @@ -264,8 +269,13 @@ public void run() { return; } - UploadResult result = poll.callable.call(); - poll.resultFuture.set(result); + try { + UploadResult result = poll.callable.call(); + poll.resultFuture.set(result); + } catch (Throwable e) { + poll.resultFuture.setException(e); + throw e; + } } while (true); }