Skip to content

Commit

Permalink
fix: update grpc WriteObject response handling to provide context whe…
Browse files Browse the repository at this point in the history
…n a failure happens (#2532)
  • Loading branch information
BenWhitehead committed May 8, 2024
1 parent 3e573f7 commit 170a3f5
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

/**
* This exception is used to preserve the caller's stacktrace when invoking an async task in a sync
* context. It will be added as a suppressed exception when propagating the async exception. This
* allows callers to catch ApiException thrown in an async operation, while still maintaining the
* call site.
*/
public final class AsyncStorageTaskException extends RuntimeException {
// mimic of com.google.api.gax.rpc.AsyncTaskException which doesn't have a public constructor
// if that class is ever made public, make this class extend it
AsyncStorageTaskException() {
super("Asynchronous task failed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.OutOfRangeException;
Expand Down Expand Up @@ -208,14 +209,19 @@ private void flush(
deps,
alg,
() -> {
Observer observer = new Observer(content, finalizing);
Observer observer = new Observer(content, finalizing, segments, internalContext);
ApiStreamObserver<WriteObjectRequest> write = callable.clientStreamingCall(observer);

for (WriteObjectRequest message : segments) {
write.onNext(message);
}
write.onCompleted();
observer.await();
try {
observer.await();
} catch (Throwable t) {
t.addSuppressed(new AsyncStorageTaskException());
throw t;
}
return null;
},
Decoder.identity());
Expand All @@ -230,13 +236,21 @@ class Observer implements ApiStreamObserver<WriteObjectResponse> {

private final RewindableContent content;
private final boolean finalizing;
private final List<WriteObjectRequest> segments;
private final GrpcCallContext context;

private final SettableApiFuture<Void> invocationHandle;
private volatile WriteObjectResponse last;

Observer(@Nullable RewindableContent content, boolean finalizing) {
Observer(
@Nullable RewindableContent content,
boolean finalizing,
@NonNull List<WriteObjectRequest> segments,
GrpcCallContext context) {
this.content = content;
this.finalizing = finalizing;
this.segments = segments;
this.context = context;
this.invocationHandle = SettableApiFuture.create();
}

Expand All @@ -250,10 +264,20 @@ public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
open = false;
invocationHandle.setException(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException());
} else {
invocationHandle.setException(t);
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
} else if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
// unusual case, and it should not cause a significant overhead given its rarity.
StorageException tmp = StorageException.asStorageException((ApiException) t);
StorageException storageException =
ResumableSessionFailureScenario.toStorageException(
tmp.getCode(), tmp.getMessage(), tmp.getReason(), segments, null, context, t);
invocationHandle.setException(storageException);
}
}

Expand All @@ -276,7 +300,8 @@ public void onCompleted() {
writeCtx.getTotalSentBytes().set(persistedSize);
writeCtx.getConfirmedBytes().set(persistedSize);
} else {
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException(
segments, last, context, null);
}
} else if (finalizing && last.hasResource()) {
long totalSentBytes = writeCtx.getTotalSentBytes().get();
Expand All @@ -285,22 +310,28 @@ public void onCompleted() {
writeCtx.getConfirmedBytes().set(finalSize);
resultFuture.set(last);
} else if (finalSize < totalSentBytes) {
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
segments, last, context, null);
} else {
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(
segments, last, context, null);
}
} else if (!finalizing && last.hasResource()) {
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException(
segments, last, context, null);
} else if (finalizing && last.hasPersistedSize()) {
long totalSentBytes = writeCtx.getTotalSentBytes().get();
long persistedSize = last.getPersistedSize();
if (persistedSize < totalSentBytes) {
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException(
segments, last, context, null);
} else {
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException(
segments, last, context, null);
}
} else {
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
segments, last, context, null);
}
} catch (Throwable se) {
open = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.Utils.ifNonNull;

import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.StorageException.IOExceptionCallable;
import com.google.common.io.CharStreams;
import com.google.protobuf.MessageOrBuilder;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.ParametersAreNonnullByDefault;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

@ParametersAreNonnullByDefault
Expand Down Expand Up @@ -69,6 +81,10 @@ enum ResumableSessionFailureScenario {
private static final String PREFIX_I = "\t|< ";
private static final String PREFIX_O = "\t|> ";
private static final String PREFIX_X = "\t| ";
// define some constants for tab widths that are more compressed that the literals
private static final String T1 = "\t";
private static final String T2 = "\t\t";
private static final String T3 = "\t\t\t";

private static final Predicate<String> includedHeaders =
matches("Content-Length")
Expand All @@ -78,6 +94,7 @@ enum ResumableSessionFailureScenario {
.or(matches("Range"))
.or(startsWith("X-Goog-Stored-"))
.or(matches("X-Goog-GCS-Idempotency-Token"))
.or(matches("X-Goog-request-params"))
.or(matches("X-GUploader-UploadID"));

private static final Predicate<Map.Entry<String, ?>> includeHeader =
Expand Down Expand Up @@ -116,8 +133,12 @@ StorageException toStorageException(
return toStorageException(code, message, reason, uploadId, resp, cause, contentCallable);
}

StorageException toStorageException() {
return new StorageException(code, message, reason, null);
StorageException toStorageException(
@NonNull List<@NonNull WriteObjectRequest> reqs,
@Nullable WriteObjectResponse resp,
@NonNull GrpcCallContext context,
@Nullable Throwable cause) {
return toStorageException(code, message, reason, reqs, resp, context, cause);
}

static StorageException toStorageException(
Expand All @@ -136,6 +157,102 @@ static StorageException toStorageException(
return se;
}

static StorageException toStorageException(
int code,
String message,
@Nullable String reason,
@NonNull List<@NonNull WriteObjectRequest> reqs,
@Nullable WriteObjectResponse resp,
@NonNull GrpcCallContext context,
@Nullable Throwable cause) {
final StringBuilder sb = new StringBuilder();
sb.append(message);
// request context
Map<String, List<String>> extraHeaders = context.getExtraHeaders();
recordHeadersTo(extraHeaders, PREFIX_O, sb);
int length = reqs.size();
for (int i = 0; i < length; i++) {
if (i == 0) {
sb.append("\n").append(PREFIX_O).append("[");
} else {
sb.append(",");
}
WriteObjectRequest req = reqs.get(i);
sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{");
if (req.hasUploadId()) {
sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId());
}
long writeOffset = req.getWriteOffset();
if (req.hasChecksummedData()) {
ChecksummedData checksummedData = req.getChecksummedData();
sb.append("\n").append(PREFIX_O).append(T2);
sb.append(
String.format(
"checksummed_data: {range: [%d:%d]",
writeOffset, writeOffset + checksummedData.getContent().size()));
if (checksummedData.hasCrc32C()) {
sb.append(", crc32c: ").append(checksummedData.getCrc32C());
}
sb.append("}");
} else {
sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset);
}
if (req.getFinishWrite()) {
sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true");
}
if (req.hasObjectChecksums()) {
ObjectChecksums objectChecksums = req.getObjectChecksums();
sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{");
fmt(objectChecksums, PREFIX_O, T3, sb);
sb.append("\n").append(PREFIX_O).append(T2).append("}");
}
sb.append("\n").append(PREFIX_O).append("\t}");
if (i == length - 1) {
sb.append("\n").append(PREFIX_O).append("]");
}
}

sb.append("\n").append(PREFIX_X);

// response context
if (resp != null) {
sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{");
fmt(resp, PREFIX_I, T1, sb);
sb.append("\n").append(PREFIX_I).append("}");
sb.append("\n").append(PREFIX_X);
}

if (cause != null) {
if (cause instanceof ApiException) {
ApiException apiException = (ApiException) cause;
Throwable cause1 = apiException.getCause();
if (cause1 instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) cause1;
sb.append("\n").append(PREFIX_I).append(statusRuntimeException.getStatus());
ifNonNull(
statusRuntimeException.getTrailers(),
t -> sb.append("\n").append(PREFIX_I).append(t));
} else {
sb.append("\n")
.append(PREFIX_I)
.append("code: ")
.append(apiException.getStatusCode().toString());
ifNonNull(
apiException.getReason(),
r -> sb.append("\n").append(PREFIX_I).append("reason: ").append(r));
ifNonNull(
apiException.getDomain(),
d -> sb.append("\n").append(PREFIX_I).append("domain: ").append(d));
ifNonNull(
apiException.getErrorDetails(),
e -> sb.append("\n").append(PREFIX_I).append("errorDetails: ").append(e));
}
sb.append("\n").append(PREFIX_X);
}
}
return new StorageException(code, sb.toString(), reason, cause);
}

static StorageException toStorageException(
int overrideCode,
String message,
Expand Down Expand Up @@ -213,14 +330,21 @@ private static Predicate<String> startsWith(String prefix) {
}

private static void recordHeaderTo(HttpHeaders h, String prefix, StringBuilder sb) {
h.entrySet().stream()
.filter(includeHeader)
.forEach(
e -> {
String key = e.getKey();
String value = headerValueToString(e.getValue());
sb.append("\n").append(prefix).append(key).append(": ").append(value);
});
h.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
}

private static void recordHeadersTo(
Map<String, List<String>> headers, String prefix, StringBuilder sb) {
headers.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
}

private static <V> Consumer<Map.Entry<String, V>> writeHeaderValue(
String prefix, StringBuilder sb) {
return e -> {
String key = e.getKey();
String value = headerValueToString(e.getValue());
sb.append("\n").append(prefix).append(key).append(": ").append(value);
};
}

private static String headerValueToString(Object o) {
Expand All @@ -233,4 +357,18 @@ private static String headerValueToString(Object o) {

return o.toString();
}

private static void fmt(
MessageOrBuilder msg,
@SuppressWarnings("SameParameterValue") String prefix,
String indentation,
StringBuilder sb) {
String string = msg.toString();
// drop the final new line before prefixing
string = string.replaceAll("\n$", "");
sb.append("\n")
.append(prefix)
.append(indentation)
.append(string.replaceAll("\r?\n", "\n" + prefix + indentation));
}
}

0 comments on commit 170a3f5

Please sign in to comment.