diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java index 914b571ba6..c154fdc32d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java @@ -38,9 +38,11 @@ import com.google.common.io.BaseEncoding; import com.google.common.io.CountingOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.OutputStream; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.security.Key; @@ -73,7 +75,8 @@ public Blob apply(Tuple pb) { } }; - private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; + private static final int DEFAULT_CHUNK_SIZE = 15 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 256 * 1024; /** Class for specifying blob source options when {@code Blob} methods are used. */ public static class BlobSourceOption extends Option { @@ -260,6 +263,88 @@ public void downloadTo(Path path) { downloadTo(path, new BlobSourceOption[0]); } + /** + * Uploads the given file path to this blob using specified blob write options. + * + * @param path file to upload + * @param options blob write options + * @return updated blob + * @throws IOException on I/O error + * @throws StorageException on failure + */ + public Blob uploadFrom(Path path, BlobWriteOption... options) throws IOException { + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + try (InputStream input = Files.newInputStream(path)) { + return uploadFrom(input, options); + } + } + + /** + * Uploads the given content to this blob using specified blob write options. + * + * @param input content to upload + * @param options blob write options + * @return updated blob + * @throws IOException on I/O error + * @throws StorageException on failure + */ + public Blob uploadFrom(InputStream input, BlobWriteOption... options) throws IOException { + try (WriteChannel writer = storage.writer(this, options)) { + uploadFrom(input, writer); + } + BlobId blobId = getBlobId(); + try { + return storage.get(BlobId.of(blobId.getBucket(), blobId.getName())); + } catch (StorageException e) { + throw new StorageException( + e.getCode(), "Content has been uploaded successfully. Failed to retrieve blob.", e); + } + } + + static void uploadFrom(InputStream input, WriteChannel writer) throws IOException { + uploadFrom(input, writer, DEFAULT_CHUNK_SIZE); + } + + /** + * Uploads the given content to the storage using specified write channel and the given buffer + * size. Other uploadFrom() methods invoke this one with a buffer size of 15 MiB. Users can pass + * alternative values. Larger buffer sizes might improve the upload performance but require more + * memory. This can cause an OutOfMemoryError or add significant garbage collection overhead. + * Smaller buffer sizes reduce memory consumption, that is noticeable when uploading many objects + * in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB. + * + *

This method does not close either the InputStream or the WriterChannel. + * + *

Example of uploading: + * + *

{@code
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
+   * Path file = Paths.get("humongous.file");
+   * try (InputStream input = Files.newInputStream(file); WriteChannel writer = storage.writer(blobInfo)) {
+   *   Blob.uploadFrom(input, writer, 150 * 1024 * 1024);
+   * } catch (IOException e) {
+   *   // your handler
+   * }
+   * }
+ * + * @param input content to upload + * @param writer channel + * @param bufferSize size of the buffer to read from input and send over writer + * @throws IOException on I/O error + */ + public static void uploadFrom(InputStream input, WriteChannel writer, int bufferSize) + throws IOException { + bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE); + byte[] buffer = new byte[bufferSize]; + int length; + while ((length = input.read(buffer)) >= 0) { + writer.write(ByteBuffer.wrap(buffer, 0, length)); + } + } + /** Builder for {@code Blob}. */ public static class Builder extends BlobInfo.Builder { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java index 6f91f968a0..4c91c7db0b 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java @@ -32,11 +32,13 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.core.ApiClock; import com.google.api.gax.retrying.RetrySettings; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Project; import com.google.cloud.storage.Acl.Project.ProjectRole; import com.google.cloud.storage.Acl.Role; @@ -48,10 +50,17 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.BaseEncoding; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.Key; import java.util.List; import java.util.Map; @@ -586,7 +595,7 @@ public void testBuilder() { } @Test - public void testDownload() throws Exception { + public void testDownloadTo() throws Exception { final byte[] expected = {1, 2}; StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); expect(storage.getOptions()).andReturn(mockOptions).times(1); @@ -618,7 +627,7 @@ public Long answer() throws Throwable { } @Test - public void testDownloadWithRetries() throws Exception { + public void testDownloadToWithRetries() throws Exception { final byte[] expected = {1, 2}; StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); expect(storage.getOptions()).andReturn(mockOptions); @@ -662,4 +671,135 @@ public Long answer() throws Throwable { byte actual[] = Files.readAllBytes(file.toPath()); assertArrayEquals(expected, actual); } + + @Test + public void testUploadFromNonExistentFile() { + initializeExpectedBlob(1); + expect(storage.getOptions()).andReturn(mockOptions); + replay(storage); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + String fileName = "non_existing_file.txt"; + try { + blob.uploadFrom(Paths.get(fileName)); + fail(); + } catch (IOException e) { + assertEquals(NoSuchFileException.class, e.getClass()); + assertEquals(fileName, e.getMessage()); + } + } + + @Test + public void testUploadFromDirectory() throws IOException { + initializeExpectedBlob(1); + expect(storage.getOptions()).andReturn(mockOptions); + replay(storage); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + Path dir = Files.createTempDirectory("unit_"); + try { + blob.uploadFrom(dir); + fail(); + } catch (StorageException e) { + assertEquals(dir + " is a directory", e.getMessage()); + } + } + + private WriteChannel createWriteChannelMock(byte[] bytes) throws Exception { + WriteChannel channel = createMock(WriteChannel.class); + ByteBuffer expectedByteBuffer = ByteBuffer.wrap(bytes, 0, bytes.length); + expect(channel.write(expectedByteBuffer)).andReturn(bytes.length); + channel.close(); + replay(channel); + return channel; + } + + private Blob createBlobForUpload(WriteChannel channel) { + initializeExpectedBlob(1); + BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName()); + expect(storage.getOptions()).andReturn(mockOptions); + expect(storage.writer(eq(expectedBlob))).andReturn(channel); + expect(storage.get(blobId)).andReturn(expectedBlob); + replay(storage); + return new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + } + + @Test + public void testUploadFromFile() throws Exception { + byte[] dataToSend = {1, 2, 3}; + WriteChannel channel = createWriteChannelMock(dataToSend); + blob = createBlobForUpload(channel); + Path tempFile = Files.createTempFile("testUpload", ".tmp"); + Files.write(tempFile, dataToSend); + blob = blob.uploadFrom(tempFile); + assertSame(expectedBlob, blob); + } + + @Test + public void testUploadFromStream() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5}; + WriteChannel channel = createWriteChannelMock(dataToSend); + blob = createBlobForUpload(channel); + InputStream input = new ByteArrayInputStream(dataToSend); + blob = blob.uploadFrom(input); + assertSame(expectedBlob, blob); + } + + @Test + public void testUploadFromStreamRetrieveFailed() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5}; + StorageException storageException = new StorageException(123, "message"); + WriteChannel channel = createWriteChannelMock(dataToSend); + initializeExpectedBlob(1); + BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName()); + expect(storage.getOptions()).andReturn(mockOptions); + expect(storage.writer(eq(expectedBlob))).andReturn(channel); + expect(storage.get(blobId)).andThrow(storageException); + replay(storage); + Blob blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + InputStream input = new ByteArrayInputStream(dataToSend); + try { + blob.uploadFrom(input); + fail(); + } catch (StorageException e) { + assertEquals( + "Content has been uploaded successfully. Failed to retrieve blob.", e.getMessage()); + assertSame(e.getCause(), storageException); + } + } + + @Test + public void testUpload() throws Exception { + replay(storage); + byte[] dataToSend = {1, 2, 3, 4, 5}; + WriteChannel channel = createWriteChannelMock(dataToSend); + InputStream input = new ByteArrayInputStream(dataToSend); + Blob.uploadFrom(input, channel); + } + + @Test + public void testUploadSmallBufferSize() throws Exception { + replay(storage); + byte[] dataToSend = new byte[100_000]; + WriteChannel channel = createWriteChannelMock(dataToSend); + InputStream input = new ByteArrayInputStream(dataToSend); + Blob.uploadFrom(input, channel, 100); + } + + @Test + public void testUploadMultiplePortions() throws Exception { + replay(storage); + int totalSize = 400_000; + int bufferSize = 300_000; + byte[] dataToSend = new byte[totalSize]; + dataToSend[0] = 42; + dataToSend[bufferSize] = 43; + + WriteChannel channel = createMock(WriteChannel.class); + expect(channel.write(ByteBuffer.wrap(dataToSend, 0, bufferSize))).andReturn(bufferSize); + expect(channel.write(ByteBuffer.wrap(dataToSend, bufferSize, totalSize - bufferSize))) + .andReturn(bufferSize - bufferSize); + channel.close(); + replay(channel); + InputStream input = new ByteArrayInputStream(dataToSend); + Blob.uploadFrom(input, channel, bufferSize); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java index e1788e6b6e..b4f655b7cf 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java @@ -39,6 +39,7 @@ import com.google.cloud.Policy; import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; +import com.google.cloud.RetryHelper; import com.google.cloud.TransportOptions; import com.google.cloud.WriteChannel; import com.google.cloud.http.HttpTransportOptions; @@ -101,6 +102,8 @@ import java.net.URL; import java.net.URLConnection; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.Key; import java.util.Arrays; import java.util.Collections; @@ -2939,4 +2942,56 @@ public void testBucketLogging() throws ExecutionException, InterruptedException RemoteStorageHelper.forceDelete(storage, loggingBucket, 5, TimeUnit.SECONDS); } } + + @Test + public void testUpload() throws Exception { + String blobName = "test-upload-static"; + BlobId blobId = BlobId.of(BUCKET, blobName); + try (WriteChannel writer = storage.writer(BlobInfo.newBuilder(blobId).build())) { + Blob.uploadFrom(new ByteArrayInputStream(BLOB_STRING_CONTENT.getBytes(UTF_8)), writer, 1); + } + Blob blob = storage.get(blobId); + String readString = new String(blob.getContent(), UTF_8); + assertEquals(BLOB_STRING_CONTENT, readString); + } + + @Test + public void testUploadFromDownloadTo() throws Exception { + String blobName = "test-uploadFrom-downloadTo-blob"; + BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build(); + + Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp"); + Files.write(tempFileFrom, BLOB_BYTE_CONTENT); + Blob blob = storage.create(blobInfo); + blob = blob.uploadFrom(tempFileFrom); + + Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp"); + blob.downloadTo(tempFileTo); + byte[] readBytes = Files.readAllBytes(tempFileTo); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } + + @Test + public void testUploadFromDownloadToWithEncryption() throws Exception { + String blobName = "test-uploadFrom-downloadTo-withEncryption-blob"; + BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build(); + + Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp"); + Files.write(tempFileFrom, BLOB_BYTE_CONTENT); + Blob blob = storage.create(blobInfo); + blob = blob.uploadFrom(tempFileFrom, Storage.BlobWriteOption.encryptionKey(KEY)); + + Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp"); + try { + blob.downloadTo(tempFileTo); + } catch (RetryHelper.RetryHelperException e) { + // Expected to be StorageException + String expectedMessage = + "The target object is encrypted by a customer-supplied encryption key."; + assertTrue(e.getMessage().contains(expectedMessage)); + } + blob.downloadTo(tempFileTo, Blob.BlobSourceOption.decryptionKey(KEY)); + byte[] readBytes = Files.readAllBytes(tempFileTo); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } }