Skip to content

Commit

Permalink
fix: update Grpc Retry Conformance after new additions to testbench (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
BenWhitehead committed Apr 17, 2024
1 parent 4ac5fc1 commit 09043c5
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 355 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public ResultRetryAlgorithm<?> getFor(CreateNotificationConfigRequest req) {
}

public ResultRetryAlgorithm<?> getFor(DeleteBucketRequest req) {
return retryStrategy.getNonidempotentHandler();
return retryStrategy.getIdempotentHandler();
}

public ResultRetryAlgorithm<?> getFor(DeleteHmacKeyRequest req) {
return retryStrategy.getNonidempotentHandler();
return retryStrategy.getIdempotentHandler();
}

public ResultRetryAlgorithm<?> getFor(DeleteNotificationConfigRequest req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,32 @@ public Blob create(
@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
requireNonNull(blobInfo, "blobInfo must be non null");
InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES));

Opts<ObjectTargetOpt> optsWithDefaults = Opts.unwrap(options).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
.build();

try (UnbufferedWritableByteChannel c = session.open()) {
ByteStreams.copy(Channels.newChannel(inputStreamParam), c);
}
ApiFuture<WriteObjectResponse> responseApiFuture = session.getResult();
return this.getBlob(responseApiFuture);
} catch (IOException | ApiException e) {
throw StorageException.coalesce(e);
}
}
Expand Down Expand Up @@ -549,17 +573,20 @@ public boolean delete(String bucket, BucketSourceOption... options) {
DeleteBucketRequest.Builder builder =
DeleteBucketRequest.newBuilder().setName(bucketNameCodec.encode(bucket));
DeleteBucketRequest req = opts.deleteBucketsRequest().apply(builder).build();
try {
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.deleteBucketCallable().call(req, merge),
Decoder.identity());
return true;
} catch (StorageException e) {
return false;
}
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Boolean.TRUE.equals(
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
storageClient.deleteBucketCallable().call(req, merge);
return true;
} catch (NotFoundException e) {
return false;
}
},
Decoder.identity()));
}

@Override
Expand Down Expand Up @@ -760,11 +787,19 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable(),
getOptions(),
retryAlgorithmManager.idempotent(),
() -> startResumableWrite(grpcCallContext, req),
() -> wrapped,
hasher);
}

Expand Down

0 comments on commit 09043c5

Please sign in to comment.