From 5c9cecf04ceb3858d58b4e2e487ffd1dddf933ab Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 13 Jun 2024 17:55:16 -0400 Subject: [PATCH] fix: update grpc bidi resumable uploads to validate ack'd object size (#2570) * fix: update grpc bidi resumable uploads to validate ack'd object size Follow up to #2527 Part 1, add tests and simulation server * fix: update grpc bidi resumable uploads to validate ack'd object size Follow up to #2527 Cleanup unused inner-class * fix: update grpc bidi resumable uploads to validate ack'd object size Follow up to #2527 Move request trimming inline rather than at the end * fix: update grpc bidi resumable uploads to validate ack'd object size Follow up to #2527 Update GapicBidiUnbufferedWritableByteChannel to correctly handle failure scenarios {1, 2, 3, 4, 4_1, 4_2, 5, 7} Refactor ResumableSessionFailureScenario to accept Message and List rather than WriteObjectResponse and List. The shape of BidiWriteObjectRequest/WriteObjectRequest and BidiWriteObjectResponse/WriteObjectResponse are largely the same (bidi has two extra fields) so rather than overloading toStorageException yet again, this time there is a single method for grpc messages that internally can branch when formatting the request. In this case, we're quite safe taking this relaxed typing because it is an internal API that is only called from a grpc context where protos will be used. * fix: update grpc bidi resumable uploads to validate ack'd object size Follow up to #2527 Update GapicBidiUnbufferedWritableByteChannel to correctly handle partial consumption of content Tests passing now. Separate tracking of client detected errors and stream errors. When await is invoked, if a client detected error is present AND a stream error is present, the client detected error will take precedence. If a stream error happens and the client detected error has not yet been observed, the stream error will be added as a suppressed exception to the client detected error. * chore: fix failing tests --- .../cloud/storage/BidiResumableWrite.java | 2 +- .../google/cloud/storage/BidiWriteCtx.java | 33 - ...apicBidiUnbufferedWritableByteChannel.java | 307 ++++-- ...BidiWritableByteChannelSessionBuilder.java | 2 +- .../ResumableSessionFailureScenario.java | 178 +++- .../com/google/cloud/storage/FakeServer.java | 2 + ...BidiUnbufferedWritableByteChannelTest.java | 938 ++++++++++++++++++ .../GrpcPlainRequestLoggingInterceptor.java | 38 + 8 files changed, 1340 insertions(+), 160 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java index dab7b2474..0f5a378f8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java @@ -52,7 +52,7 @@ public StartResumableWriteResponse getRes() { @Override public BidiWriteObjectRequest.Builder newBuilder() { - return writeRequest.toBuilder(); + return writeRequest.toBuilder().clearWriteObjectSpec(); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java index 09e4177c2..a458c079c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java @@ -16,8 +16,6 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; - import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.storage.v2.BidiWriteObjectRequest; @@ -84,36 +82,5 @@ interface BidiWriteObjectRequestBuilderFactory { @Nullable String bucketName(); - - static BidiSimpleWriteObjectRequestBuilderFactory simple(BidiWriteObjectRequest req) { - return new BidiSimpleWriteObjectRequestBuilderFactory(req); - } - } - - static final class BidiSimpleWriteObjectRequestBuilderFactory - implements BidiWriteObjectRequestBuilderFactory { - private final BidiWriteObjectRequest req; - - private BidiSimpleWriteObjectRequestBuilderFactory(BidiWriteObjectRequest req) { - this.req = req; - } - - @Override - public BidiWriteObjectRequest.Builder newBuilder() { - return req.toBuilder(); - } - - @Override - public @Nullable String bucketName() { - if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) { - return req.getWriteObjectSpec().getResource().getBucket(); - } - return null; - } - - @Override - public String toString() { - return "SimpleBidiWriteObjectRequestBuilderFactory{" + "req=" + fmtProto(req) + '}'; - } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java index 19aba735e..15278616d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -21,23 +21,28 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.OutOfRangeException; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.ObjectChecksums; +import io.grpc.Status; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Semaphore; import java.util.function.Supplier; @@ -47,18 +52,19 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable private final BidiStreamingCallable write; private final RetryingDependencies deps; private final ResultRetryAlgorithm alg; - private final String bucketName; - private final Supplier baseContextSupplier; private final SettableApiFuture resultFuture; private final ChunkSegmenter chunkSegmenter; private final BidiWriteCtx writeCtx; + private final GrpcCallContext context; private final BidiObserver responseObserver; private volatile ApiStreamObserver stream; private boolean open = true; private boolean first = true; private boolean finished = false; + private volatile BidiWriteObjectRequest lastWrittenRequest; + private volatile RewindableContent currentContent; GapicBidiUnbufferedWritableByteChannel( BidiStreamingCallable write, @@ -66,18 +72,18 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable ResultRetryAlgorithm alg, SettableApiFuture resultFuture, ChunkSegmenter chunkSegmenter, - BidiResumableWrite requestFactory, + BidiWriteCtx writeCtx, Supplier baseContextSupplier) { this.write = write; this.deps = deps; this.alg = alg; - this.baseContextSupplier = baseContextSupplier; - this.bucketName = requestFactory.bucketName(); this.resultFuture = resultFuture; this.chunkSegmenter = chunkSegmenter; - this.writeCtx = new BidiWriteCtx<>(requestFactory); + this.writeCtx = writeCtx; this.responseObserver = new BidiObserver(); + String bucketName = writeCtx.getRequestFactory().bucketName(); + this.context = contextWithBucketName(bucketName, baseContextSupplier.get()); } @Override @@ -102,22 +108,22 @@ public void close() throws IOException { if (!open) { return; } - ApiStreamObserver openedStream = openedStream(); - if (!finished) { - BidiWriteObjectRequest message = finishMessage(); - try { - openedStream.onNext(message); - finished = true; - openedStream.onCompleted(); - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; + try { + if (!finished) { + BidiWriteObjectRequest message = finishMessage(); + lastWrittenRequest = message; + flush(Collections.singletonList(message)); + } else { + if (stream != null) { + stream.onCompleted(); + responseObserver.await(); + } } - } else { - openedStream.onCompleted(); + } finally { + open = false; + stream = null; + lastWrittenRequest = null; } - responseObserver.await(); - open = false; } @VisibleForTesting @@ -131,12 +137,18 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo throw new ClosedChannelException(); } - ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + long begin = writeCtx.getConfirmedBytes().get(); + currentContent = RewindableContent.of(srcs, srcsOffset, srcsLength); + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, finalize); + if (data.length == 0) { + currentContent = null; + return 0; + } List messages = new ArrayList<>(); - int bytesConsumed = 0; - for (ChunkSegment datum : data) { + for (int i = 0; i < data.length; i++) { + ChunkSegment datum = data[i]; Crc32cLengthKnown crc32c = datum.getCrc32c(); ByteString b = datum.getB(); int contentSize = b.size(); @@ -149,11 +161,14 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo if (crc32c != null) { checksummedData.setCrc32C(crc32c.getValue()); } - BidiWriteObjectRequest.Builder builder = - writeCtx - .newRequestBuilder() - .setWriteOffset(offset) - .setChecksummedData(checksummedData.build()); + BidiWriteObjectRequest.Builder builder = writeCtx.newRequestBuilder(); + if (!first) { + builder.clearUploadId(); + builder.clearObjectChecksums(); + } else { + first = false; + } + builder.setWriteOffset(offset).setChecksummedData(checksummedData.build()); if (!datum.isOnlyFullBlocks()) { builder.setFinishWrite(true); if (cumulative != null) { @@ -163,10 +178,17 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo finished = true; } - BidiWriteObjectRequest build = possiblyPairDownBidiRequest(builder, first).build(); - first = false; + if (i == data.length - 1 && !finished) { + if (finalize) { + builder.setFinishWrite(true); + finished = true; + } else { + builder.setFlush(true).setStateLookup(true); + } + } + + BidiWriteObjectRequest build = builder.build(); messages.add(build); - bytesConsumed += contentSize; } if (finalize && !finished) { messages.add(finishMessage()); @@ -176,10 +198,14 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo try { flush(messages); } catch (RuntimeException e) { + open = false; resultFuture.setException(e); throw e; } + long end = writeCtx.getConfirmedBytes().get(); + + long bytesConsumed = end - begin; return bytesConsumed; } @@ -188,8 +214,11 @@ private BidiWriteObjectRequest finishMessage() { long offset = writeCtx.getTotalSentBytes().get(); Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); - BidiWriteObjectRequest.Builder b = - writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder(); + if (!first) { + b.clearUploadId().clearObjectChecksums(); + } + b.setFinishWrite(true).setWriteOffset(offset); if (crc32cValue != null) { b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); } @@ -201,13 +230,9 @@ private ApiStreamObserver openedStream() { if (stream == null) { synchronized (this) { if (stream == null) { - GrpcCallContext internalContext = - contextWithBucketName(bucketName, baseContextSupplier.get()); + responseObserver.reset(); stream = - this.write - .withDefaultCallContext(internalContext) - .bidiStreamingCall(responseObserver); - responseObserver.sem.drainPermits(); + new GracefulOutboundStream(this.write.bidiStreamingCall(responseObserver, context)); } } } @@ -223,47 +248,28 @@ private void flush(@NonNull List segments) { ApiStreamObserver opened = openedStream(); for (BidiWriteObjectRequest message : segments) { opened.onNext(message); + lastWrittenRequest = message; } - if (!finished) { - BidiWriteObjectRequest message = - BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build(); - opened.onNext(message); + if (lastWrittenRequest.getFinishWrite()) { + opened.onCompleted(); } responseObserver.await(); return null; - } catch (Exception e) { + } catch (Throwable t) { stream = null; first = true; - throw e; + t.addSuppressed(new AsyncStorageTaskException()); + throw t; } }, Decoder.identity()); } - private static BidiWriteObjectRequest.Builder possiblyPairDownBidiRequest( - BidiWriteObjectRequest.Builder b, boolean firstMessageOfStream) { - if (firstMessageOfStream && b.getWriteOffset() == 0) { - return b; - } - - if (!firstMessageOfStream) { - b.clearUploadId(); - } - - if (b.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b; - } - private class BidiObserver implements ApiStreamObserver { private final Semaphore sem; private volatile BidiWriteObjectResponse last; + private volatile StorageException clientDetectedError; private volatile RuntimeException previousError; private BidiObserver() { @@ -272,22 +278,96 @@ private BidiObserver() { @Override public void onNext(BidiWriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - writeCtx.getConfirmedBytes().set((value.getPersistedSize())); - } else if (value.hasResource()) { - writeCtx.getConfirmedBytes().set(value.getResource().getSize()); + boolean finalizing = lastWrittenRequest.getFinishWrite(); + if (!finalizing && value.hasPersistedSize()) { // incremental + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = value.getPersistedSize(); + + if (totalSentBytes == persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + ok(value); + } else if (persistedSize < totalSentBytes) { + long delta = totalSentBytes - persistedSize; + // rewind our content and any state that my have run ahead of the actual ack'd bytes + currentContent.rewindTo(delta); + writeCtx.getTotalSentBytes().set(persistedSize); + writeCtx.getConfirmedBytes().set(persistedSize); + ok(value); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_7.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else if (finalizing && value.hasResource()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long finalSize = value.getResource().getSize(); + if (totalSentBytes == finalSize) { + writeCtx.getConfirmedBytes().set(finalSize); + ok(value); + } else if (finalSize < totalSentBytes) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else if (!finalizing && value.hasResource()) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_1.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else if (finalizing && value.hasPersistedSize()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = value.getPersistedSize(); + // if a flush: true, state_lookup: true message is in the stream along with a + // finish_write: true, GCS can respond with the incremental update, gracefully handle this + // message + if (totalSentBytes == persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + } else if (persistedSize < totalSentBytes) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_3.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_2.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); } - sem.release(); - last = value; } @Override public void onError(Throwable t) { - if (t instanceof RuntimeException) { + if (t instanceof OutOfRangeException) { + OutOfRangeException oore = (OutOfRangeException) t; + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( + ImmutableList.of(lastWrittenRequest), null, context, oore)); + } else if (t instanceof ApiException) { + // use StorageExceptions logic to translate from ApiException to our status codes ensuring + // things fall in line with our retry handlers. + // This is suboptimal, as it will initialize a second exception, however this is the + // unusual case, and it should not cause a significant overhead given its rarity. + StorageException tmp = StorageException.asStorageException((ApiException) t); + previousError = + ResumableSessionFailureScenario.toStorageException( + tmp.getCode(), + tmp.getMessage(), + tmp.getReason(), + ImmutableList.of(lastWrittenRequest), + null, + context, + t); + sem.release(); + } else if (t instanceof RuntimeException) { previousError = (RuntimeException) t; + sem.release(); } - sem.release(); } @Override @@ -298,6 +378,25 @@ public void onCompleted() { sem.release(); } + private void ok(BidiWriteObjectResponse value) { + last = value; + sem.release(); + } + + private void clientDetectedError(StorageException storageException) { + open = false; + clientDetectedError = storageException; + // yes, check that previousError is not the same instance as e + if (previousError != null && previousError != storageException) { + storageException.addSuppressed(previousError); + previousError = null; + } + if (previousError == null) { + previousError = storageException; + } + sem.release(); + } + void await() { try { sem.acquire(); @@ -308,11 +407,69 @@ void await() { throw new RuntimeException(e); } } + StorageException e = clientDetectedError; RuntimeException err = previousError; + clientDetectedError = null; + previousError = null; + if ((e != null || err != null) && stream != null) { + if (lastWrittenRequest.getFinishWrite()) { + stream.onCompleted(); + } else { + stream.onError(Status.CANCELLED.asRuntimeException()); + } + } + if (e != null) { + throw e; + } if (err != null) { - previousError = null; throw err; } } + + public void reset() { + sem.drainPermits(); + last = null; + clientDetectedError = null; + previousError = null; + } + } + + /** + * Prevent "already half-closed" if we previously called onComplete but then detect an error and + * call onError + */ + private static final class GracefulOutboundStream + implements ApiStreamObserver { + + private final ApiStreamObserver delegate; + private volatile boolean closing; + + private GracefulOutboundStream(ApiStreamObserver delegate) { + this.delegate = delegate; + this.closing = false; + } + + @Override + public void onNext(BidiWriteObjectRequest value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable t) { + if (closing) { + return; + } + closing = true; + delegate.onError(t); + } + + @Override + public void onCompleted() { + if (closing) { + return; + } + closing = true; + delegate.onCompleted(); + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java index 536eba7fb..05387326a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java @@ -155,7 +155,7 @@ BufferedWritableByteChannelSession build() { resultFuture, new ChunkSegmenter( boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), - start, + new BidiWriteCtx<>(start), Retrying::newCallContext)) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java index 8c06d70dd..88dbaf7bd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java @@ -26,11 +26,11 @@ import com.google.cloud.BaseServiceException; import com.google.cloud.storage.StorageException.IOExceptionCallable; import com.google.common.io.CharStreams; -import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.Message; +import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.WriteObjectRequest; -import com.google.storage.v2.WriteObjectResponse; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.io.InputStreamReader; @@ -81,10 +81,6 @@ enum ResumableSessionFailureScenario { private static final String PREFIX_I = "\t|< "; private static final String PREFIX_O = "\t|> "; private static final String PREFIX_X = "\t| "; - // define some constants for tab widths that are more compressed that the literals - private static final String T1 = "\t"; - private static final String T2 = "\t\t"; - private static final String T3 = "\t\t\t"; private static final Predicate includedHeaders = matches("Content-Length") @@ -134,8 +130,10 @@ StorageException toStorageException( } StorageException toStorageException( - @NonNull List<@NonNull WriteObjectRequest> reqs, - @Nullable WriteObjectResponse resp, + /*In java List is not a sub-type of List despite WriteObjectRequest being a Message. + * intentionally only define List so the compiler doesn't complain */ + @SuppressWarnings("rawtypes") @NonNull List reqs, + @Nullable Message resp, @NonNull GrpcCallContext context, @Nullable Throwable cause) { return toStorageException(code, message, reason, reqs, resp, context, cause); @@ -161,8 +159,8 @@ static StorageException toStorageException( int code, String message, @Nullable String reason, - @NonNull List<@NonNull WriteObjectRequest> reqs, - @Nullable WriteObjectResponse resp, + @NonNull List reqs, + @Nullable Message resp, @NonNull GrpcCallContext context, @Nullable Throwable cause) { final StringBuilder sb = new StringBuilder(); @@ -177,35 +175,8 @@ static StorageException toStorageException( } else { sb.append(","); } - WriteObjectRequest req = reqs.get(i); - sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{"); - if (req.hasUploadId()) { - sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId()); - } - long writeOffset = req.getWriteOffset(); - if (req.hasChecksummedData()) { - ChecksummedData checksummedData = req.getChecksummedData(); - sb.append("\n").append(PREFIX_O).append(T2); - sb.append( - String.format( - "checksummed_data: {range: [%d:%d]", - writeOffset, writeOffset + checksummedData.getContent().size())); - if (checksummedData.hasCrc32C()) { - sb.append(", crc32c: ").append(checksummedData.getCrc32C()); - } - sb.append("}"); - } else { - sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset); - } - if (req.getFinishWrite()) { - sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true"); - } - if (req.hasObjectChecksums()) { - ObjectChecksums objectChecksums = req.getObjectChecksums(); - sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{"); - fmt(objectChecksums, PREFIX_O, T3, sb); - sb.append("\n").append(PREFIX_O).append(T2).append("}"); - } + Message req = (Message) reqs.get(i); + fmt(req, PREFIX_O, Indentation.T1, sb); sb.append("\n").append(PREFIX_O).append("\t}"); if (i == length - 1) { sb.append("\n").append(PREFIX_O).append("]"); @@ -217,7 +188,7 @@ static StorageException toStorageException( // response context if (resp != null) { sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{"); - fmt(resp, PREFIX_I, T1, sb); + fmt(resp, PREFIX_I, Indentation.T1, sb); sb.append("\n").append(PREFIX_I).append("}"); sb.append("\n").append(PREFIX_X); } @@ -250,7 +221,8 @@ static StorageException toStorageException( sb.append("\n").append(PREFIX_X); } } - return new StorageException(code, sb.toString(), reason, cause); + StorageException se = new StorageException(code, sb.toString(), reason, cause); + return se; } static StorageException toStorageException( @@ -359,16 +331,122 @@ private static String headerValueToString(Object o) { } private static void fmt( - MessageOrBuilder msg, + Message msg, @SuppressWarnings("SameParameterValue") String prefix, - String indentation, + Indentation indentation, StringBuilder sb) { - String string = msg.toString(); - // drop the final new line before prefixing - string = string.replaceAll("\n$", ""); - sb.append("\n") - .append(prefix) - .append(indentation) - .append(string.replaceAll("\r?\n", "\n" + prefix + indentation)); + if (msg instanceof WriteObjectRequest) { + WriteObjectRequest req = (WriteObjectRequest) msg; + fmtWriteObjectRequest(req, prefix, indentation, sb); + } else if (msg instanceof BidiWriteObjectRequest) { + BidiWriteObjectRequest req = (BidiWriteObjectRequest) msg; + fmtBidiWriteObjectRequest(req, prefix, indentation, sb); + } else { + String string = msg.toString(); + // drop the final new line before prefixing + string = string.replaceAll("\n$", ""); + sb.append("\n") + .append(prefix) + .append(indentation) + .append(string.replaceAll("\r?\n", "\n" + prefix + indentation.indentation)); + } + } + + private static void fmtWriteObjectRequest( + WriteObjectRequest req, String prefix, Indentation t1, StringBuilder sb) { + Indentation t2 = t1.indent(); + Indentation t3 = t2.indent(); + sb.append("\n").append(prefix).append(t1).append(req.getClass().getName()).append("{"); + if (req.hasUploadId()) { + sb.append("\n").append(prefix).append(t2).append("upload_id: ").append(req.getUploadId()); + } + long writeOffset = req.getWriteOffset(); + if (req.hasChecksummedData()) { + ChecksummedData checksummedData = req.getChecksummedData(); + sb.append("\n").append(prefix).append(t2); + sb.append( + String.format( + "checksummed_data: {range: [%d:%d]", + writeOffset, writeOffset + checksummedData.getContent().size())); + if (checksummedData.hasCrc32C()) { + sb.append(", crc32c: ").append(checksummedData.getCrc32C()); + } + sb.append("}"); + } else { + sb.append("\n").append(prefix).append(t2).append("write_offset: ").append(writeOffset); + } + if (req.getFinishWrite()) { + sb.append("\n").append(prefix).append(t2).append("finish_write: true"); + } + if (req.hasObjectChecksums()) { + ObjectChecksums objectChecksums = req.getObjectChecksums(); + sb.append("\n").append(prefix).append(t2).append("object_checksums: ").append("{"); + fmt(objectChecksums, prefix, t3, sb); + sb.append("\n").append(prefix).append(t2).append("}"); + } + } + + private static void fmtBidiWriteObjectRequest( + BidiWriteObjectRequest req, String prefix, Indentation t1, StringBuilder sb) { + Indentation t2 = t1.indent(); + Indentation t3 = t2.indent(); + sb.append("\n").append(prefix).append(t1).append(req.getClass().getName()).append("{"); + if (req.hasUploadId()) { + sb.append("\n").append(prefix).append(t2).append("upload_id: ").append(req.getUploadId()); + } + long writeOffset = req.getWriteOffset(); + if (req.hasChecksummedData()) { + ChecksummedData checksummedData = req.getChecksummedData(); + sb.append("\n").append(prefix).append(t2); + sb.append( + String.format( + "checksummed_data: {range: [%d:%d]", + writeOffset, writeOffset + checksummedData.getContent().size())); + if (checksummedData.hasCrc32C()) { + sb.append(", crc32c: ").append(checksummedData.getCrc32C()); + } + sb.append("}"); + } else { + sb.append("\n").append(prefix).append(t2).append("write_offset: ").append(writeOffset); + } + if (req.getFlush()) { + sb.append("\n").append(prefix).append(t2).append("flush: true"); + } + if (req.getStateLookup()) { + sb.append("\n").append(prefix).append(t2).append("state_lookup: true"); + } + if (req.getFinishWrite()) { + sb.append("\n").append(prefix).append(t2).append("finish_write: true"); + } + if (req.hasObjectChecksums()) { + ObjectChecksums objectChecksums = req.getObjectChecksums(); + sb.append("\n").append(prefix).append(t2).append("object_checksums: ").append("{"); + fmt(objectChecksums, prefix, t3, sb); + sb.append("\n").append(prefix).append(t2).append("}"); + } + } + + enum Indentation { + T1("\t"), + T2("\t\t"), + T3("\t\t\t"), + T4("\t\t\t\t"), + ; + + private final String indentation; + + Indentation(String indentation) { + this.indentation = indentation; + } + + Indentation indent() { + int ordinal = ordinal(); + return values()[ordinal + 1]; + } + + @Override + public String toString() { + return indentation; + } } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java index d27783425..578f6c7dc 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java @@ -17,6 +17,7 @@ package com.google.cloud.storage; import com.google.cloud.NoCredentials; +import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor; import com.google.storage.v2.StorageGrpc; import com.google.storage.v2.StorageSettings; import io.grpc.Server; @@ -58,6 +59,7 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { .setHost("http://" + endpoint) .setProjectId("test-proj") .setCredentials(NoCredentials.getInstance()) + .setGrpcInterceptorProvider(GrpcPlainRequestLoggingInterceptor.getInterceptorProvider()) .build(); return new FakeServer(server, grpcStorageOptions); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java new file mode 100644 index 000000000..7c3fb1fd7 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java @@ -0,0 +1,938 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.ByteSizeConstants._512KiB; +import static com.google.cloud.storage.ByteSizeConstants._768KiB; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertThrows; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.Object; +import com.google.storage.v2.StartResumableWriteRequest; +import com.google.storage.v2.StartResumableWriteResponse; +import com.google.storage.v2.StorageClient; +import com.google.storage.v2.StorageGrpc.StorageImplBase; +import io.grpc.Status.Code; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.logging.Logger; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; + +public final class ITGapicBidiUnbufferedWritableByteChannelTest { + + private static final ChunkSegmenter CHUNK_SEGMENTER = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + + /** + * + * + *

S.1

+ * + * Attempting to append to a session which has already been finalized should raise an error + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = { name = obj, size = 524288 }
+   *     
client state
+   * write_offset = 0, data = [0:262144]
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset= 0, checksummed_data.content.length = 262144 }
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 525288 } })
+   *     
+ */ + @Test + public void scenario1() throws Exception { + + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_512KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + SettableApiFuture done = SettableApiFuture.create(); + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.2

+ * + * Attempting to finalize a session with fewer bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 524288
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ persisted_size = 525288 })
+   *     
+ */ + @Test + public void scenario2() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.3

+ * + * Attempting to finalize a session with more bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 262144
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ persisted_size = 262144 })
+   *     
+ */ + @Test + public void scenario3() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4

+ * + * Attempting to finalize an already finalized session + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + channel.close(); + + BidiWriteObjectResponse BidiWriteObjectResponse = done.get(2, TimeUnit.SECONDS); + assertThat(BidiWriteObjectResponse).isEqualTo(resp1); + } + } + + /** + * + * + *

S.4.1

+ * + * Attempting to finalize an already finalized session (ack < expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4_1() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4.2

+ * + * Attempting to finalize an already finalized session (ack > expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 786432}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 786432 } })
+   *     
+ */ + @Test + public void scenario4_2() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_768KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.5

+ * + * Attempt to append to a resumable session with an offset higher than GCS expects + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 0
+   *     
client state
+   * write_offset = 262144, data = [262144:524288]
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, checksummed_data.content.length = 262144}
+   *     
response
+   * onError(Status{code=OUT_OF_RANGE, description="Upload request started at offset '262144', which is past expected offset '0'."})
+   *     
+ */ + @Test + public void scenario5() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB)))) + .setStateLookup(true) + .setFlush(true) + .build(); + StorageImplBase service1 = + new BidiWriteService( + (obs, requests) -> { + if (requests.equals(ImmutableList.of(req1))) { + obs.onError( + TestUtils.apiException( + Code.OUT_OF_RANGE, + "Upload request started at offset '262144', which is past expected offset '0'.")); + } else { + obs.onError( + TestUtils.apiException(Code.PERMISSION_DENIED, "Unexpected request chain.")); + } + }); + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.7

+ * + * GCS Acknowledges more bytes than were sent in the PUT + * + *

The client believes the server offset is N, it sends K bytes and the server responds that N + * + 2K bytes are now committed. + * + *

The client has detected data loss and should raise an error and prevent sending of more + * bytes. + */ + @Test + public void scenario7() throws Exception { + + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + @Test + public void incremental_success() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(0), + () -> assertThat(written).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + } + } + + @Test + public void incremental_partialSuccess() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_512KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + chunkSegmenter, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(_256KiB), + () -> assertThat(written).isEqualTo(_256KiB), + () -> + assertWithMessage("totalSentBytes") + .that(writeCtx.getTotalSentBytes().get()) + .isEqualTo(_256KiB), + () -> + assertWithMessage("confirmedBytes") + .that(writeCtx.getConfirmedBytes().get()) + .isEqualTo(_256KiB)); + } + } + + private static @NonNull BidiResumableWrite getResumableWrite(String uploadId) { + StartResumableWriteRequest req = StartResumableWriteRequest.getDefaultInstance(); + StartResumableWriteResponse resp = + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); + return new BidiResumableWrite( + req, resp, id -> BidiWriteObjectRequest.newBuilder().setUploadId(id).build()); + } + + static class BidiWriteService extends StorageImplBase { + private static final Logger LOGGER = Logger.getLogger(BidiWriteService.class.getName()); + private final BiConsumer, List> + c; + + private ImmutableList.Builder requests; + + BidiWriteService( + BiConsumer, List> c) { + this.c = c; + this.requests = new ImmutableList.Builder<>(); + } + + BidiWriteService(ImmutableMap, BidiWriteObjectResponse> writes) { + this( + (obs, build) -> { + if (writes.containsKey(build)) { + obs.onNext(writes.get(build)); + last(build) + .filter(BidiWriteObjectRequest::getFinishWrite) + .ifPresent(ignore -> obs.onCompleted()); + } else { + logUnexpectedRequest(writes.keySet(), build); + obs.onError( + TestUtils.apiException(Code.PERMISSION_DENIED, "Unexpected request chain.")); + } + }); + } + + private static Optional last(List l) { + if (l.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(l.get(l.size() - 1)); + } + } + + private static void logUnexpectedRequest( + Set> writes, List build) { + Collector joining = Collectors.joining(",\n\t", "[\n\t", "\n]"); + Collector oneLine = Collectors.joining(",", "[", "]"); + String msg = + String.format( + "Unexpected Request Chain.%nexpected one of: %s%n but was: %s", + writes.stream() + .map(l -> l.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine)) + .collect(joining), + build.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine)); + LOGGER.warning(msg); + } + + @Override + public StreamObserver bidiWriteObject( + StreamObserver obs) { + return new Adapter() { + @Override + public void onNext(BidiWriteObjectRequest value) { + requests.add(value); + if ((value.getFlush() && value.getStateLookup()) || value.getFinishWrite()) { + ImmutableList build = requests.build(); + c.accept(obs, build); + } + } + + @Override + public void onError(Throwable t) { + requests = new ImmutableList.Builder<>(); + } + + @Override + public void onCompleted() { + requests = new ImmutableList.Builder<>(); + } + }; + } + } + + private abstract static class Adapter extends CallStreamObserver { + + private Adapter() {} + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + + @Override + public void setMessageCompression(boolean enable) {} + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java index 7116108d7..611f7fd1c 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java @@ -20,6 +20,8 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.protobuf.MessageOrBuilder; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.ReadObjectResponse; import com.google.storage.v2.WriteObjectRequest; import io.grpc.CallOptions; import io.grpc.Channel; @@ -109,6 +111,10 @@ public void sendMessage(ReqT message) { static String fmtProto(@NonNull Object obj) { if (obj instanceof WriteObjectRequest) { return fmtProto((WriteObjectRequest) obj); + } else if (obj instanceof BidiWriteObjectRequest) { + return fmtProto((BidiWriteObjectRequest) obj); + } else if (obj instanceof ReadObjectResponse) { + return fmtProto((ReadObjectResponse) obj); } else if (obj instanceof MessageOrBuilder) { return fmtProto((MessageOrBuilder) obj); } else { @@ -137,6 +143,38 @@ static String fmtProto(@NonNull WriteObjectRequest msg) { return msg.toString(); } + @NonNull + static String fmtProto(@NonNull BidiWriteObjectRequest msg) { + if (msg.hasChecksummedData()) { + ByteString content = msg.getChecksummedData().getContent(); + if (content.size() > 20) { + BidiWriteObjectRequest.Builder b = msg.toBuilder(); + ByteString snip = ByteString.copyFromUtf8(String.format("", content.size())); + ByteString trim = content.substring(0, 20).concat(snip); + b.getChecksummedDataBuilder().setContent(trim); + + return b.build().toString(); + } + } + return msg.toString(); + } + + @NonNull + static String fmtProto(@NonNull ReadObjectResponse msg) { + if (msg.hasChecksummedData()) { + ByteString content = msg.getChecksummedData().getContent(); + if (content.size() > 20) { + ReadObjectResponse.Builder b = msg.toBuilder(); + ByteString snip = ByteString.copyFromUtf8(String.format("", content.size())); + ByteString trim = content.substring(0, 20).concat(snip); + b.getChecksummedDataBuilder().setContent(trim); + + return b.build().toString(); + } + } + return msg.toString(); + } + private static final class InterceptorProvider implements GrpcInterceptorProvider { private static final InterceptorProvider INSTANCE = new InterceptorProvider();