Skip to content

Commit

Permalink
feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling (
Browse files Browse the repository at this point in the history
#2489)

* feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling

* Update google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>

* add copyright header

* Apply suggestions from code review

Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>

* Extract classes to own files, more StorageClient initialization

* copyright headers on new files

* formatter

* one more lint issue

* fix: improve GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller#close() handling of multiple IOExceptions

* clean up ZeroCopyMarshallerTest

* add gprc core to pom

---------

Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
  • Loading branch information
JesseLovelace and BenWhitehead committed Apr 14, 2024
1 parent e2030b2 commit 8c7404d
Show file tree
Hide file tree
Showing 14 changed files with 767 additions and 78 deletions.
8 changes: 8 additions & 0 deletions google-cloud-storage/pom.xml
Expand Up @@ -96,6 +96,14 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
Expand Down
Expand Up @@ -48,19 +48,23 @@ public static GapicDownloadSessionBuilder create() {
* ultimately produced channel will not do any retries of its own.
*/
public ReadableByteChannelSessionBuilder byteChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
return new ReadableByteChannelSessionBuilder(read);
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager) {
return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager);
}

public static final class ReadableByteChannelSessionBuilder {

private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
private final ResponseContentLifecycleManager responseContentLifecycleManager;
private boolean autoGzipDecompression;
private Hasher hasher;

private ReadableByteChannelSessionBuilder(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager) {
this.read = read;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.hasher = Hasher.noop();
this.autoGzipDecompression = false;
}
Expand Down Expand Up @@ -100,11 +104,13 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
return (object, resultFuture) -> {
if (autoGzipDecompression) {
return new GzipReadableByteChannel(
new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher),
new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager),
ApiFutures.transform(
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
} else {
return new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher);
return new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager);
}
};
}
Expand Down
Expand Up @@ -16,15 +16,14 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.http.HttpStatusCodes;
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.protobuf.ByteString;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.Object;
import com.google.storage.v2.ReadObjectRequest;
Expand All @@ -46,25 +45,28 @@ final class GapicUnbufferedReadableByteChannel
private final ReadObjectRequest req;
private final Hasher hasher;
private final LazyServerStreamIterator iter;
private final ResponseContentLifecycleManager rclm;

private boolean open = true;
private boolean complete = false;
private long blobOffset;

private Object metadata;

private ByteBuffer leftovers;
private ResponseContentLifecycleHandle leftovers;

GapicUnbufferedReadableByteChannel(
SettableApiFuture<Object> result,
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ReadObjectRequest req,
Hasher hasher) {
Hasher hasher,
ResponseContentLifecycleManager rclm) {
this.result = result;
this.read = read;
this.req = req;
this.hasher = hasher;
this.blobOffset = req.getReadOffset();
this.rclm = rclm;
this.iter = new LazyServerStreamIterator();
}

Expand All @@ -82,15 +84,17 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity);
while (c.hasRemaining()) {
if (leftovers != null) {
copy(c, leftovers, dsts, offset, length);
leftovers.copy(c, dsts, offset, length);
if (!leftovers.hasRemaining()) {
leftovers.close();
leftovers = null;
}
continue;
}

if (iter.hasNext()) {
ReadObjectResponse resp = iter.next();
ResponseContentLifecycleHandle handle = rclm.get(resp);
if (resp.hasMetadata()) {
Object respMetadata = resp.getMetadata();
if (metadata == null) {
Expand All @@ -107,22 +111,24 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
}
}
ChecksummedData checksummedData = resp.getChecksummedData();
ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer();
// very important to know whether a crc32c value is set. Without checking, protobuf will
ByteString content = checksummedData.getContent();
int contentSize = content.size();
// Very important to know whether a crc32c value is set. Without checking, protobuf will
// happily return 0, which is a valid crc32c value.
if (checksummedData.hasCrc32C()) {
Crc32cLengthKnown expected =
Crc32cValue.of(checksummedData.getCrc32C(), checksummedData.getContent().size());
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
try {
hasher.validate(expected, content::duplicate);
hasher.validate(expected, content.asReadOnlyByteBufferList());
} catch (IOException e) {
close();
throw e;
}
}
copy(c, content, dsts, offset, length);
if (content.hasRemaining()) {
leftovers = content;
handle.copy(c, dsts, offset, length);
if (handle.hasRemaining()) {
leftovers = handle;
} else {
handle.close();
}
} else {
complete = true;
Expand All @@ -144,59 +150,26 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
open = false;
iter.close();
try {
if (leftovers != null) {
leftovers.close();
}
} finally {
iter.close();
}
}

ApiFuture<Object> getResult() {
return result;
}

private void copy(ReadCursor c, ByteBuffer content, ByteBuffer[] dsts, int offset, int length) {
long copiedBytes = Buffers.copy(content, dsts, offset, length);
c.advance(copiedBytes);
}

private IOException closeWithError(String message) throws IOException {
close();
StorageException cause =
new StorageException(HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, message);
throw new IOException(message, cause);
}

/**
* Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of
* {@link #read}
*/
private static final class ReadCursor {
private final long beginning;
private long offset;
private final long limit;

private ReadCursor(long beginning, long limit) {
this.limit = limit;
this.beginning = beginning;
this.offset = beginning;
}

public boolean hasRemaining() {
return limit - offset > 0;
}

public void advance(long incr) {
checkArgument(incr >= 0);
offset += incr;
}

public long read() {
return offset - beginning;
}

@Override
public String toString() {
return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit);
}
}

private final class LazyServerStreamIterator implements Iterator<ReadObjectResponse>, Closeable {
private ServerStream<ReadObjectResponse> serverStream;
private Iterator<ReadObjectResponse> responseIterator;
Expand Down
Expand Up @@ -28,15 +28,18 @@
final class GrpcBlobReadChannel extends BaseStorageReadChannel<Object> {

private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
private final ResponseContentLifecycleManager responseContentLifecycleManager;
private final ReadObjectRequest request;
private final boolean autoGzipDecompression;

GrpcBlobReadChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager,
ReadObjectRequest request,
boolean autoGzipDecompression) {
super(Conversions.grpc().blobInfo());
this.read = read;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.request = request;
this.autoGzipDecompression = autoGzipDecompression;
}
Expand All @@ -53,7 +56,7 @@ protected LazyReadChannel<?, Object> newLazyReadChannel() {
ReadableByteChannelSessionBuilder b =
ResumableMedia.gapic()
.read()
.byteChannel(read)
.byteChannel(read, responseContentLifecycleManager)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
Expand Down
Expand Up @@ -180,6 +180,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
.collect(ImmutableSet.toImmutableSet())));

final StorageClient storageClient;
final ResponseContentLifecycleManager responseContentLifecycleManager;
final WriterFactory writerFactory;
final GrpcConversions codecs;
final GrpcRetryAlgorithmManager retryAlgorithmManager;
Expand All @@ -192,10 +193,12 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
GrpcStorageImpl(
GrpcStorageOptions options,
StorageClient storageClient,
ResponseContentLifecycleManager responseContentLifecycleManager,
WriterFactory writerFactory,
Opts<UserProject> defaultOpts) {
super(options);
this.storageClient = storageClient;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.writerFactory = writerFactory;
this.defaultOpts = defaultOpts;
this.codecs = Conversions.grpc();
Expand Down Expand Up @@ -716,8 +719,10 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);

return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
responseContentLifecycleManager,
request,
!opts.autoGzipDecompression());
}
Expand Down Expand Up @@ -1868,7 +1873,9 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
.byteChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
responseContentLifecycleManager)
.setAutoGzipDecompression(!opts.autoGzipDecompression())
.unbuffered()
.setReadObjectRequest(readObjectRequest)
Expand Down

0 comments on commit 8c7404d

Please sign in to comment.