Skip to content

Commit

Permalink
feat: add upload functionality (#214)
Browse files Browse the repository at this point in the history
feat: add upload functionality

* feat: add upload functionality

* fix: review comments
  • Loading branch information
dmitry-fa committed Apr 1, 2020
1 parent be74072 commit 7beb99d
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 0 deletions.
@@ -0,0 +1,188 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.cloud.WriteChannel;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;

/** Utility methods to perform various operations with the Storage such as upload. */
public final class StorageOperations {

private final Storage storage;
private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

/**
* Creates a new StorageOperations instance associated with the given storage.
*
* @param storage the Storage
*/
public StorageOperations(Storage storage) {
this.storage = storage;
}

/**
* Uploads {@code path} to the blob using {@link Storage#writer}. By default any MD5 and CRC32C
* values in the given {@code blobInfo} are ignored unless requested via the {@link
* Storage.BlobWriteOption#md5Match()} and {@link Storage.BlobWriteOption#crc32cMatch()} options.
* Folder upload is not supported.
*
* <p>Example of uploading a file:
*
* <pre>{@code
* String bucketName = "my-unique-bucket";
* String fileName = "readme.txt";
* BlobId blobId = BlobId.of(bucketName, fileName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* new StorageOperations(storage).upload(blobInfo, Paths.get(fileName));
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param options blob write options
* @throws IOException on I/O error
* @throws StorageException on failure
* @see #upload(BlobInfo, Path, int, Storage.BlobWriteOption...)
*/
public void upload(BlobInfo blobInfo, Path path, Storage.BlobWriteOption... options)
throws IOException {
upload(blobInfo, path, DEFAULT_BUFFER_SIZE, options);
}

/**
* Uploads {@code path} to the blob using {@link Storage#writer} and {@code bufferSize}. By
* default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested
* via the {@link Storage.BlobWriteOption#md5Match()} and {@link
* Storage.BlobWriteOption#crc32cMatch()} options. Folder upload is not supported.
*
* <p>{@link #upload(BlobInfo, Path, Storage.BlobWriteOption...)} invokes this method with a
* buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the
* upload performance but require more memory. This can cause an OutOfMemoryError or add
* significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that
* is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are
* treated as 256 KiB.
*
* <p>Example of uploading a humongous file:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
*
* int largeBufferSize = 150 * 1024 * 1024;
* Path file = Paths.get("humongous.file");
* new StorageOperations(storage).upload(blobInfo, file, largeBufferSize);
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public void upload(
BlobInfo blobInfo, Path path, int bufferSize, Storage.BlobWriteOption... options)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
upload(blobInfo, input, bufferSize, options);
}
}

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link
* Storage#writer}. By default any MD5 and CRC32C values in the given {@code blobInfo} are ignored
* unless requested via the {@link Storage.BlobWriteOption#md5Match()} and {@link
* Storage.BlobWriteOption#crc32cMatch()} options.
*
* <p>Example of uploading data with CRC32C checksum:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
* Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
* String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
* new StorageOperations(storage).upload(blobInfo, new ByteArrayInputStream(content),
* Storage.BlobWriteOption.crc32cMatch());
* }</pre>
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param options blob write options
* @throws IOException on I/O error
* @throws StorageException on failure
* @see #upload(BlobInfo, InputStream, int, Storage.BlobWriteOption...)
*/
public void upload(BlobInfo blobInfo, InputStream content, Storage.BlobWriteOption... options)
throws IOException {
upload(blobInfo, content, DEFAULT_BUFFER_SIZE, options);
}

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link
* Storage#writer} and {@code bufferSize}. By default any MD5 and CRC32C values in the given
* {@code blobInfo} are ignored unless requested via the {@link
* Storage.BlobWriteOption#md5Match()} and {@link Storage.BlobWriteOption#crc32cMatch()} options.
*
* <p>{@link #upload(BlobInfo, InputStream, Storage.BlobWriteOption...)} )} invokes this method
* with a buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might
* improve the upload performance but require more memory. This can cause an OutOfMemoryError or
* add significant garbage collection overhead. Smaller buffer sizes reduce memory consumption,
* that is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are
* treated as 256 KiB.
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public void upload(
BlobInfo blobInfo, InputStream content, int bufferSize, Storage.BlobWriteOption... options)
throws IOException {
try (WriteChannel writer = storage.writer(blobInfo, options)) {
upload(Channels.newChannel(content), writer, bufferSize);
}
}

/*
* Uploads the given content to the storage using specified write channel and the given buffer
* size. This method does not close any channels.
*/
private static void upload(ReadableByteChannel reader, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
writer.setChunkSize(bufferSize);

while (reader.read(buffer) >= 0) {
buffer.flip();
writer.write(buffer);
buffer.clear();
}
}
}
@@ -0,0 +1,173 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;

import com.google.cloud.WriteChannel;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class StorageOperationsTest {
private Storage storage;
private StorageOperations storageOperations;

private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder("b", "n").build();
private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

@Before
public void setUp() {
storage = createStrictMock(Storage.class);
storageOperations = new StorageOperations(storage);
}

@After
public void tearDown() throws Exception {
verify(storage);
}

@Test
public void testUploadFromNonExistentFile() {
replay(storage);
String fileName = "non_existing_file.txt";
try {
storageOperations.upload(BLOB_INFO, Paths.get(fileName));
storageOperations.upload(BLOB_INFO, Paths.get(fileName), -1);
fail();
} catch (IOException e) {
assertEquals(NoSuchFileException.class, e.getClass());
assertEquals(fileName, e.getMessage());
}
}

@Test
public void testUploadFromDirectory() throws IOException {
replay(storage);
Path dir = Files.createTempDirectory("unit_");
try {
storageOperations.upload(BLOB_INFO, dir);
storageOperations.upload(BLOB_INFO, dir, -2);
fail();
} catch (StorageException e) {
assertEquals(dir + " is a directory", e.getMessage());
}
}

private void prepareForUpload(BlobInfo blobInfo, byte[] bytes, Storage.BlobWriteOption... options)
throws Exception {
prepareForUpload(blobInfo, bytes, DEFAULT_BUFFER_SIZE, options);
}

private void prepareForUpload(
BlobInfo blobInfo, byte[] bytes, int bufferSize, Storage.BlobWriteOption... options)
throws Exception {
WriteChannel channel = createStrictMock(WriteChannel.class);
ByteBuffer expectedByteBuffer = ByteBuffer.wrap(bytes, 0, bytes.length);
channel.setChunkSize(bufferSize);
expect(channel.write(expectedByteBuffer)).andReturn(bytes.length);
channel.close();
replay(channel);
expect(storage.writer(blobInfo, options)).andReturn(channel);
replay(storage);
}

@Test
public void testUploadFromFile() throws Exception {
byte[] dataToSend = {1, 2, 3};
prepareForUpload(BLOB_INFO, dataToSend);
Path tempFile = Files.createTempFile("testUpload", ".tmp");
Files.write(tempFile, dataToSend);
storageOperations.upload(BLOB_INFO, tempFile);
}

@Test
public void testUploadFromStream() throws Exception {
byte[] dataToSend = {1, 2, 3, 4, 5};
Storage.BlobWriteOption[] options =
new Storage.BlobWriteOption[] {Storage.BlobWriteOption.crc32cMatch()};
prepareForUpload(BLOB_INFO, dataToSend, options);
InputStream input = new ByteArrayInputStream(dataToSend);
storageOperations.upload(BLOB_INFO, input, options);
}

@Test
public void testUploadSmallBufferSize() throws Exception {
byte[] dataToSend = new byte[100_000];
prepareForUpload(BLOB_INFO, dataToSend, MIN_BUFFER_SIZE);
InputStream input = new ByteArrayInputStream(dataToSend);
int smallBufferSize = 100;
storageOperations.upload(BLOB_INFO, input, smallBufferSize);
}

@Test
public void testUploadFromIOException() throws Exception {
IOException ioException = new IOException("message");
WriteChannel channel = createStrictMock(WriteChannel.class);
channel.setChunkSize(DEFAULT_BUFFER_SIZE);
expect(channel.write((ByteBuffer) anyObject())).andThrow(ioException);
replay(channel);
expect(storage.writer(eq(BLOB_INFO))).andReturn(channel);
replay(storage);
InputStream input = new ByteArrayInputStream(new byte[10]);
try {
storageOperations.upload(BLOB_INFO, input);
fail();
} catch (IOException e) {
assertSame(e, ioException);
}
}

@Test
public void testUploadMultiplePortions() throws Exception {
int totalSize = 400_000;
int bufferSize = 300_000;
byte[] dataToSend = new byte[totalSize];
dataToSend[0] = 42;
dataToSend[bufferSize] = 43;

WriteChannel channel = createStrictMock(WriteChannel.class);
channel.setChunkSize(bufferSize);
expect(channel.write(ByteBuffer.wrap(dataToSend, 0, bufferSize))).andReturn(1);
expect(channel.write(ByteBuffer.wrap(dataToSend, bufferSize, totalSize - bufferSize)))
.andReturn(2);
channel.close();
replay(channel);
expect(storage.writer(BLOB_INFO)).andReturn(channel);
replay(storage);

InputStream input = new ByteArrayInputStream(dataToSend);
storageOperations.upload(BLOB_INFO, input, bufferSize);
}
}

0 comments on commit 7beb99d

Please sign in to comment.