From 4e64a8b2bf73e07f8b4ee21f1e1f9777fa4fd801 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 26 Jun 2025 16:24:46 -0400 Subject: [PATCH] feat: add default end-to-end checksumming for several upload methods via grpc transport * Storage#blobWriteSession for BlobWriteSessionConfigs.getDefault() * Storage#blobWriteSession for BlobWriteSessionConfigs.bufferToTempDirThenUpload() * Storage#blobWriteSession for BlobWriteSessionConfigs.bufferToDiskThenUpload(*) * Storage#create(BlobInfo, InputStream) * Storage#createFrom(BlobInfo, InputStream) * Storage#createFrom(BlobInfo, Path) * Storage#writer --- .../DefaultBlobWriteSessionConfig.java | 2 +- ...apicWritableByteChannelSessionBuilder.java | 38 ++++++++----- .../google/cloud/storage/GrpcStorageImpl.java | 8 +-- .../java/com/google/cloud/storage/Hasher.java | 19 +++++++ .../JournalingBlobWriteSessionConfig.java | 3 +- .../com/google/cloud/storage/WriteCtx.java | 17 +++--- ...unkedResumableWritableByteChannelTest.java | 54 ++++++++++--------- ...bufferedDirectWritableByteChannelTest.java | 29 +++++++--- ...CloseResumableWritableByteChannelTest.java | 20 ++++--- ...apicUnbufferedWritableByteChannelTest.java | 11 ++-- ...fferedWritableByteChannelPropertyTest.java | 11 ++-- .../it/ITObjectChecksumSupportTest.java | 2 + 12 files changed, 134 insertions(+), 80 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 869be68e0d..6e5915bbf0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -163,7 +163,7 @@ public WritableByteChannelSession writeSession( grpc.storageClient .writeObjectCallable() .withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) + .setHasher(opts.getHasher()) .setByteStringStrategy(ByteStringStrategy.copy()) .resumable() .withRetryConfig( diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index ff73dc6892..a256d39b7a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -50,14 +50,14 @@ final class GapicWritableByteChannelSessionBuilder { GapicWritableByteChannelSessionBuilder( ClientStreamingCallable write) { this.write = write; - this.hasher = Hasher.noop(); + this.hasher = Hasher.defaultHasher(); this.byteStringStrategy = ByteStringStrategy.copy(); } /** * Set the {@link Hasher} to apply to the bytes passing through the built session's channel. * - *

Default: {@link Hasher#noop()} + *

Default: {@link Hasher#defaultHasher()} * * @see Hasher#enabled() * @see Hasher#noop() @@ -179,14 +179,17 @@ UnbufferedDirectUploadBuilder setRequest(WriteObjectRequest req) { } UnbufferedWritableByteChannelSession build() { + ChunkSegmenter chunkSegmenter = getChunkSegmenter(); return new UnbufferedWriteSession<>( ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")), lift((WriteObjectRequest start, SettableApiFuture resultFuture) -> new GapicUnbufferedDirectWritableByteChannel( resultFuture, - getChunkSegmenter(), + chunkSegmenter, write, - new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start)))) + WriteCtx.of( + WriteObjectRequestBuilderFactory.simple(start), + chunkSegmenter.getHasher()))) .andThen(StorageByteChannels.writable()::createSynchronized)); } } @@ -207,14 +210,17 @@ BufferedDirectUploadBuilder setRequest(WriteObjectRequest req) { } BufferedWritableByteChannelSession build() { + ChunkSegmenter chunkSegmenter = getChunkSegmenter(); return new BufferedWriteSession<>( ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")), lift((WriteObjectRequest start, SettableApiFuture resultFuture) -> new GapicUnbufferedDirectWritableByteChannel( resultFuture, - getChunkSegmenter(), + chunkSegmenter, write, - new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start)))) + WriteCtx.of( + WriteObjectRequestBuilderFactory.simple(start), + chunkSegmenter.getHasher()))) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); } @@ -290,20 +296,24 @@ UnbufferedResumableUploadBuilder setStartAsync(ApiFuture start) UnbufferedWritableByteChannelSession build() { RetrierWithAlg boundRetrier = retrier; + ChunkSegmenter chunkSegmenter = getChunkSegmenter(); return new UnbufferedWriteSession<>( requireNonNull(start, "start must be non null"), lift((ResumableWrite start, SettableApiFuture result) -> { if (fsyncEvery) { return new GapicUnbufferedChunkedResumableWritableByteChannel( result, - getChunkSegmenter(), + chunkSegmenter, write, - new WriteCtx<>(start), + WriteCtx.of(start, chunkSegmenter.getHasher()), boundRetrier, Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, new WriteCtx<>(start)); + result, + chunkSegmenter, + write, + WriteCtx.of(start, chunkSegmenter.getHasher())); } }) .andThen(StorageByteChannels.writable()::createSynchronized)); @@ -330,20 +340,24 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture start) { } BufferedWritableByteChannelSession build() { + ChunkSegmenter chunkSegmenter = getChunkSegmenter(); return new BufferedWriteSession<>( requireNonNull(start, "start must be non null"), lift((ResumableWrite start, SettableApiFuture result) -> { if (fsyncEvery) { return new GapicUnbufferedChunkedResumableWritableByteChannel( result, - getChunkSegmenter(), + chunkSegmenter, write, - new WriteCtx<>(start), + WriteCtx.of(start, chunkSegmenter.getHasher()), retrier, Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, new WriteCtx<>(start)); + result, + chunkSegmenter, + write, + WriteCtx.of(start, chunkSegmenter.getHasher())); } }) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 50b3596c99..3caa13dcb2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -263,7 +263,7 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op GrpcCallContext grpcCallContext = optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults); - Hasher hasher = Hasher.enabled(); + Hasher hasher = optsWithDefaults.getHasher(); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); UnbufferedWritableByteChannelSession session = ResumableMedia.gapic() @@ -324,7 +324,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o write, storageClient.queryWriteStatusCallable(), rw, - Hasher.noop()), + opts.getHasher()), MoreExecutors.directExecutor()); try { GrpcResumableSession got = session2.get(); @@ -365,7 +365,7 @@ public Blob createFrom( .write() .byteChannel( storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) + .setHasher(opts.getHasher()) .setByteStringStrategy(ByteStringStrategy.noCopy()) .resumable() .withRetryConfig(retrier.withAlg(retryAlgorithmManager.idempotent())) @@ -779,7 +779,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - Hasher hasher = Hasher.noop(); + Hasher hasher = opts.getHasher(); // in JSON, the starting of the resumable session happens before the invocation of write can // happen. Emulate the same thing here. // 1. create the future diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index 207b52602e..47a7b029e0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -76,6 +76,15 @@ void validateUnchecked(Crc32cValue expected, ByteString byteString) @Nullable Crc32cLengthKnown nullSafeConcat( @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2); + /** + * The initial value to use for this hasher. + * + *

Not ideal, really we should always start with {@link Crc32cValue#zero()} but this saves us + * from having to plumb the initial value along with the actual hasher to the constructor of the + * WriteCtx when hashing is disabled because of user provided crc32c/md5 preconditions. + */ + @Nullable Crc32cLengthKnown initialValue(); + static NoOpHasher noop() { return NoOpHasher.INSTANCE; } @@ -118,6 +127,11 @@ public void validateUnchecked(Crc32cValue expected, ByteString byteString) {} @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) { return null; } + + @Override + public @Nullable Crc32cLengthKnown initialValue() { + return null; + } } @Immutable @@ -185,6 +199,11 @@ public Crc32cLengthKnown nullSafeConcat( return r1.concat(r2); } } + + @Override + public @NonNull Crc32cLengthKnown initialValue() { + return Crc32cValue.zero(); + } } final class ChecksumMismatchException extends IOException { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java index b770a1b4e7..4d3ad9722c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java @@ -192,7 +192,8 @@ public WritableByteChannelSession writeSession( grpcStorage.startResumableWrite( grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts); ApiFuture> start = - ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor()); + ApiFutures.transform( + f, s -> WriteCtx.of(s, opts.getHasher()), MoreExecutors.directExecutor()); ClientStreamingCallable write = grpcStorage.storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java index 21eaa59a29..5539c04ba9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java @@ -23,6 +23,7 @@ import com.google.storage.v2.WriteObjectRequest; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; final class WriteCtx { @@ -33,22 +34,18 @@ final class WriteCtx { private final AtomicLong confirmedBytes; private final AtomicReference<@Nullable Crc32cLengthKnown> cumulativeCrc32c; - WriteCtx(RequestFactoryT requestFactory) { - this(requestFactory, null); - } - - /** - * TODO: Remove initialValue and replace with Crc32cValue.zero() once all uploads have been - * updated to do e2e checksumming by default. - */ - @Deprecated - WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) { + private WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) { this.requestFactory = requestFactory; this.totalSentBytes = new AtomicLong(0); this.confirmedBytes = new AtomicLong(0); this.cumulativeCrc32c = new AtomicReference<>(initialValue); } + static WriteCtx of( + RFT rft, @NonNull Hasher hasher) { + return new WriteCtx<>(rft, hasher.initialValue()); + } + public RequestFactoryT getRequestFactory() { return requestFactory; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java index 6108a0b0b2..473e73a9c1 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java @@ -26,13 +26,16 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.Hasher.GuavaHasher; import com.google.cloud.storage.ITGapicUnbufferedWritableByteChannelTest.DirectWriteService; import com.google.cloud.storage.Retrying.RetrierWithAlg; +import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.StorageClient; @@ -48,8 +51,9 @@ public final class ITGapicUnbufferedChunkedResumableWritableByteChannelTest { + public static final GuavaHasher HASHER = Hasher.enabled(); private static final ChunkSegmenter CHUNK_SEGMENTER = - new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + new ChunkSegmenter(HASHER, ByteStringStrategy.copy(), _256KiB, _256KiB); /** * @@ -94,10 +98,8 @@ public void scenario1() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setChecksummedData( - ChecksummedData.newBuilder() - .setContent( - ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) - .build()) + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)) + .asChecksummedData()) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder() @@ -115,7 +117,7 @@ public void scenario1() throws Exception { ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); SettableApiFuture done = SettableApiFuture.create(); //noinspection resource GapicUnbufferedChunkedResumableWritableByteChannel channel = @@ -179,6 +181,7 @@ public void scenario2() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_256KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); @@ -194,7 +197,7 @@ public void scenario2() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); @@ -259,6 +262,7 @@ public void scenario3() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); @@ -274,7 +278,7 @@ public void scenario3() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); @@ -339,6 +343,7 @@ public void scenario4() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_256KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -357,7 +362,7 @@ public void scenario4() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); @@ -419,6 +424,7 @@ public void scenario4_1() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -437,7 +443,7 @@ public void scenario4_1() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); @@ -502,6 +508,7 @@ public void scenario4_2() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -520,7 +527,7 @@ public void scenario4_2() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); @@ -586,9 +593,8 @@ public void scenario5() throws Exception { .setUploadId(uploadId) .setWriteOffset(_256KiB) .setChecksummedData( - ChecksummedData.newBuilder() - .setContent( - ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB)))) + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)) + .asChecksummedData()) .build(); StorageImplBase service1 = new DirectWriteService( @@ -611,7 +617,7 @@ public void scenario5() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); @@ -656,10 +662,8 @@ public void scenario7() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setChecksummedData( - ChecksummedData.newBuilder() - .setContent( - ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) - .build()) + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)) + .asChecksummedData()) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); @@ -674,7 +678,7 @@ public void scenario7() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); //noinspection resource GapicUnbufferedChunkedResumableWritableByteChannel channel = @@ -703,10 +707,8 @@ public void incremental_success() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setChecksummedData( - ChecksummedData.newBuilder() - .setContent( - ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) - .build()) + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)) + .asChecksummedData()) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); @@ -721,7 +723,7 @@ public void incremental_success() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); //noinspection resource GapicUnbufferedChunkedResumableWritableByteChannel channel = @@ -768,7 +770,7 @@ public void incremental_partialSuccess() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); ChunkSegmenter chunkSegmenter = new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java index 4a70e9a2e3..e8b9cd7d3b 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java @@ -24,12 +24,14 @@ import static org.junit.Assert.assertThrows; import com.google.api.core.SettableApiFuture; +import com.google.cloud.storage.Hasher.GuavaHasher; import com.google.cloud.storage.ITGapicUnbufferedWritableByteChannelTest.DirectWriteService; import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.StorageClient; import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; @@ -39,14 +41,19 @@ public final class ITGapicUnbufferedDirectWritableByteChannelTest { + public static final GuavaHasher HASHER = Hasher.enabled(); private static final ChunkSegmenter CHUNK_SEGMENTER = - new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + new ChunkSegmenter(HASHER, ByteStringStrategy.copy(), _256KiB, _256KiB); /** Attempting to finalize, ack equals expected */ @Test public void ack_eq() throws Exception { WriteObjectRequest req1 = - WriteObjectRequest.newBuilder().setWriteOffset(_256KiB).setFinishWrite(true).build(); + WriteObjectRequest.newBuilder() + .setWriteOffset(_256KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) + .setFinishWrite(true) + .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder() .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) @@ -63,7 +70,7 @@ public void ack_eq() throws Exception { SettableApiFuture done = SettableApiFuture.create(); WriteCtx writeCtx = - new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(req1)); + WriteCtx.of(WriteObjectRequestBuilderFactory.simple(req1), HASHER); writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(0); @@ -85,7 +92,11 @@ public void ack_eq() throws Exception { @Test public void ack_lt() throws Exception { WriteObjectRequest req1 = - WriteObjectRequest.newBuilder().setWriteOffset(_512KiB).setFinishWrite(true).build(); + WriteObjectRequest.newBuilder() + .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) + .setFinishWrite(true) + .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder() .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) @@ -102,7 +113,7 @@ public void ack_lt() throws Exception { SettableApiFuture done = SettableApiFuture.create(); WriteCtx writeCtx = - new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(req1)); + WriteCtx.of(WriteObjectRequestBuilderFactory.simple(req1), HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); @@ -124,7 +135,11 @@ public void ack_lt() throws Exception { @Test public void ack_gt() throws Exception { WriteObjectRequest req1 = - WriteObjectRequest.newBuilder().setWriteOffset(_512KiB).setFinishWrite(true).build(); + WriteObjectRequest.newBuilder() + .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) + .setFinishWrite(true) + .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder() .setResource(Object.newBuilder().setName("name").setSize(_768KiB).build()) @@ -141,7 +156,7 @@ public void ack_gt() throws Exception { SettableApiFuture done = SettableApiFuture.create(); WriteCtx writeCtx = - new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(req1)); + WriteCtx.of(WriteObjectRequestBuilderFactory.simple(req1), HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java index 8181bd2bc2..e9dd60f7fe 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java @@ -24,10 +24,12 @@ import static org.junit.Assert.assertThrows; import com.google.api.core.SettableApiFuture; +import com.google.cloud.storage.Hasher.GuavaHasher; import com.google.cloud.storage.ITGapicUnbufferedWritableByteChannelTest.DirectWriteService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.StorageClient; @@ -40,8 +42,9 @@ public final class ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest { + public static final GuavaHasher HASHER = Hasher.enabled(); private static final ChunkSegmenter CHUNK_SEGMENTER = - new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + new ChunkSegmenter(HASHER, ByteStringStrategy.copy(), _256KiB, _256KiB); @Test public void incrementalResponseForFinalizingRequest() throws Exception { @@ -50,6 +53,7 @@ public void incrementalResponseForFinalizingRequest() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); @@ -65,7 +69,7 @@ public void incrementalResponseForFinalizingRequest() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); @@ -75,7 +79,6 @@ public void incrementalResponseForFinalizingRequest() throws Exception { done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); StorageException se = assertThrows(StorageException.class, channel::close); - se.printStackTrace(System.out); assertAll( () -> assertThat(se.getCode()).isEqualTo(0), () -> assertThat(se.getReason()).isEqualTo("invalid"), @@ -127,6 +130,7 @@ public void scenario4() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_256KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -145,7 +149,7 @@ public void scenario4() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(0); @@ -203,6 +207,7 @@ public void scenario4_1() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -221,7 +226,7 @@ public void scenario4_1() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); @@ -231,7 +236,6 @@ public void scenario4_1() throws Exception { done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); StorageException se = assertThrows(StorageException.class, channel::close); - se.printStackTrace(System.out); assertAll( () -> assertThat(se.getCode()).isEqualTo(0), () -> assertThat(se.getReason()).isEqualTo("dataLoss"), @@ -282,6 +286,7 @@ public void scenario4_2() throws Exception { WriteObjectRequest.newBuilder() .setUploadId(uploadId) .setWriteOffset(_512KiB) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .setFinishWrite(true) .build(); WriteObjectResponse resp1 = @@ -300,7 +305,7 @@ public void scenario4_2() throws Exception { SettableApiFuture done = SettableApiFuture.create(); ResumableWrite resumableWrite = getResumableWrite(uploadId); - WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); @@ -310,7 +315,6 @@ public void scenario4_2() throws Exception { done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); StorageException se = assertThrows(StorageException.class, channel::close); - se.printStackTrace(System.out); assertAll( () -> assertThat(se.getCode()).isEqualTo(0), () -> assertThat(se.getReason()).isEqualTo("dataLoss"), diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java index daf4554cf7..e23b2a57f7 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java @@ -159,10 +159,7 @@ public void directUpload() throws IOException, InterruptedException, ExecutionEx SettableApiFuture result = SettableApiFuture.create(); try (GapicUnbufferedDirectWritableByteChannel c = new GapicUnbufferedDirectWritableByteChannel( - result, - segmenter, - sc.writeObjectCallable(), - new WriteCtx<>(reqFactory, Crc32cValue.zero()))) { + result, segmenter, sc.writeObjectCallable(), WriteCtx.of(reqFactory, HASHER))) { c.write(ByteBuffer.wrap(bytes)); } assertThat(result.get()).isEqualTo(resp); @@ -188,7 +185,7 @@ public void resumableUpload() throws IOException, InterruptedException, Executio result, segmenter, sc.writeObjectCallable(), - new WriteCtx<>(reqFactory, Crc32cValue.zero()), + WriteCtx.of(reqFactory, HASHER), RetrierWithAlg.attemptOnce(), Retrying::newCallContext); ArrayList debugMessages = new ArrayList<>(); @@ -270,7 +267,7 @@ public void resumableUpload_chunkAutomaticRetry() result, segmenter, sc.writeObjectCallable(), - new WriteCtx<>(reqFactory, Crc32cValue.zero()), + WriteCtx.of(reqFactory, HASHER), TestUtils.retrierFromStorageOptions(fake.getGrpcStorageOptions()) .withAlg(Retrying.alwaysRetry()), Retrying::newCallContext)) { @@ -322,7 +319,7 @@ public void resumableUpload_finalizeWhenWriteAndCloseCalledEvenWhenQuantumAligne result, segmenter, sc.writeObjectCallable(), - new WriteCtx<>(reqFactory, Crc32cValue.zero()), + WriteCtx.of(reqFactory, HASHER), RetrierWithAlg.attemptOnce(), Retrying::newCallContext); try { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java index fd4f0fd58e..9b1b2f7822 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java @@ -356,7 +356,7 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception { // TestUtils.defaultRetrier(), new DefaultRetrier(UnaryOperator.identity(), defaultRetryingDeps()), StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(), - new WriteCtx<>(resumableWrite), + WriteCtx.of(resumableWrite, s.chunkSegmenter.getHasher()), rf, s.copyBuffer); try (BufferedWritableByteChannel w = s.buffered(syncAndUpload)) { @@ -550,19 +550,22 @@ private void unexpected(StreamObserver respond, Message msg) { BlobInfo info = BlobInfo.newBuilder("b", "o").build(); SettableApiFuture resultFuture = SettableApiFuture.create(); BufferHandle recoverBufferHandle = BufferHandle.allocate(2); + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2); SyncAndUploadUnbufferedWritableByteChannel syncAndUpload = new SyncAndUploadUnbufferedWritableByteChannel( storage.storageClient.writeObjectCallable(), storage.storageClient.queryWriteStatusCallable(), resultFuture, - new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2), + chunkSegmenter, new DefaultRetrier(UnaryOperator.identity(), storage.getOptions()), StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(), - new WriteCtx<>( + WriteCtx.of( new ResumableWrite( reqStart, resStart, - id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build())), + id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build()), + chunkSegmenter.getHasher()), recoveryFileManager.newRecoveryFile(info), recoverBufferHandle); try (BufferedWritableByteChannel w = diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java index da0c504526..27ac48eb9b 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java @@ -85,6 +85,8 @@ public ImmutableList parameters() { int _24MiB = 24 * 1024 * 1024; return ImmutableList.of( + // empty object content + ChecksummedTestContent.of(new byte[0]), // small, single message single stream when resumable ChecksummedTestContent.of(gen.genBytes(15)), // med, multiple messages single stream when resumable