Skip to content

Commit

Permalink
feat: add support for StorageError (#1391)
Browse files Browse the repository at this point in the history
* feat: add support for StorageError

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* add license header

* add error parsing in doneCallback

* address feedback

* add test case for onDone

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* refactor code

* lint

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* apply changes from code review

Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>

* add integration test for onDoneCallback

* make error check more vigorous

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 15, 2021
1 parent 38f5eb9 commit 176dc8b
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
public final class Exceptions {
/** Main Storage Exception. Might contain map of streams to errors for that stream. */
public static class StorageException extends RuntimeException {

private final ImmutableMap<String, GrpcStatusCode> errors;
private final String streamName;

private StorageException() {
this(null, null, null, ImmutableMap.of());
}

private StorageException(
@Nullable String message,
@Nullable Throwable cause,
@Nullable String streamName,
ImmutableMap<String, GrpcStatusCode> errors) {
super(message, cause);
this.streamName = streamName;
this.errors = errors;
}

public ImmutableMap<String, GrpcStatusCode> getErrors() {
return errors;
}

public String getStreamName() {
return streamName;
}
}

/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
protected StreamFinalizedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
protected SchemaMismatchedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

private static StorageError toStorageError(com.google.rpc.Status rpcStatus) {
for (Any detail : rpcStatus.getDetailsList()) {
if (detail.is(StorageError.class)) {
try {
return detail.unpack(StorageError.class);
} catch (InvalidProtocolBufferException protoException) {
throw new IllegalStateException(protoException);
}
}
}
return null;
}

/**
* Converts a c.g.rpc.Status into a StorageException, if possible. Examines the embedded
* StorageError, and potentially returns a {@link StreamFinalizedException} or {@link
* SchemaMismatchedException} (both derive from StorageException). If there is no StorageError, or
* the StorageError is a different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(
com.google.rpc.Status rpcStatus, Throwable exception) {
StorageError error = toStorageError(rpcStatus);
if (error == null) {
return null;
}
switch (error.getCode()) {
case STREAM_FINALIZED:
return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception);

case SCHEMA_MISMATCH_EXTRA_FIELDS:
return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception);

default:
return null;
}
}

/**
* Converts a Throwable into a StorageException, if possible. Examines the embedded error message,
* and potentially returns a {@link StreamFinalizedException} or {@link SchemaMismatchedException}
* (both derive from StorageException). If there is no StorageError, or the StorageError is a
* different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(Throwable exception) {
// TODO: switch to using rpcStatus when cl/408735437 is rolled out
// com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
Status grpcStatus = Status.fromThrowable(exception);
String message = exception.getMessage();
String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
Pattern streamPattern = Pattern.compile(streamPatternString);
if (message == null) {
return null;
}
// TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("input schema has more fields than bigquery schema")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new SchemaMismatchedException(entity, message, exception);
}
// TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("stream has been finalized and cannot be appended")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new StreamFinalizedException(entity, message, exception);
}
return null;
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,17 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}
if (response.hasError()) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
} else {
requestWrapper.appendResult.set(response);
}
Expand All @@ -482,6 +488,10 @@ private void doneCallback(Throwable finalStatus) {
} finally {
this.lock.unlock();
}
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
if (storageException != null) {
this.connectionFinalStatus = storageException;
}
}

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -304,6 +306,96 @@ public void testAppendSuccessAndInStreamError() throws Exception {
writer.close();
}

@Test
public void testAppendFailedSchemaError() throws Exception {
StreamWriter writer = getTestStreamWriter();

StorageError storageError =
StorageError.newBuilder()
.setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
.setEntity("foobar")
.build();
com.google.rpc.Status statusProto =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT.getHttpStatusCode())
.addDetails(Any.pack(storageError))
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setError(statusProto).build());
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertEquals("foobar", actualError.getStreamName());
assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue());

writer.close();
}

@Test
public void testAppendFailedOnDone() throws Exception {
StreamWriter writer = getTestStreamWriter();

StatusRuntimeException exception =
new StatusRuntimeException(
io.grpc.Status.INVALID_ARGUMENT.withDescription(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(exception);

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertTrue(
actualError
.getMessage()
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

writer.close();
}

// TODO(stephwang): update test case to below when toStorageException is updated
// @Test
// public void testAppendFailedOnDone2() throws Exception {
// StreamWriter writer = getTestStreamWriter();
//
// StorageError storageError =
// StorageError.newBuilder()
// .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
// .setEntity("foobar")
// .build();
// com.google.rpc.Status statusProto =
// com.google.rpc.Status.newBuilder()
// .addDetails(Any.pack(storageError))
// .build();
//
// StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto);
//
// testBigQueryWrite.addResponse(createAppendResponse(0));
// testBigQueryWrite.addException(exception);
//
// ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
// ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
//
// assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
// Exceptions.SchemaMismatchedException actualError =
// assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
// assertEquals("foobar", actualError.getStreamName());
//
// writer.close();
// }

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ ProtoRows CreateProtoRows(String[] messages) {
return rows.build();
}

ProtoRows CreateProtoRowsMultipleColumns(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
UpdatedFooType foo = UpdatedFooType.newBuilder().setFoo(message).setBar(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

ProtoRows CreateProtoRowsComplex(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
Expand Down Expand Up @@ -499,6 +508,68 @@ public void testStreamError() throws IOException, InterruptedException, Executio
}
}

@Test
public void testStreamSchemaMisMatchError() throws IOException, InterruptedException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor()))
.build()) {
// Create a proto row that has extra fields than the table schema defined which should trigger
// the SCHEMA_MISMATCH_EXTRA_FIELDS error
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema");
}
}
}

@Test
public void testStreamFinalizedError()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor()))
.build()) {
// Finalize the stream in order to trigger STREAM_FINALIZED error
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
// Try to append to a finalized stream
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// //TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended");
}
}
}

@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
Expand Down

0 comments on commit 176dc8b

Please sign in to comment.