Skip to content
Permalink
Browse files
fix: update storageError support due to server side enhancement (#1456)
* fix: update storageError support due to server side enhancement

client-side changes corresponding to cl/408735437

* update
  • Loading branch information
stephaniewang526 committed Dec 30, 2021
1 parent 4616adb commit 6243ad5cba61d4dae7f4ceb60b09c625e7589215
@@ -19,10 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.grpc.protobuf.StatusProto;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
@@ -124,30 +121,8 @@ public static StorageException toStorageException(
*/
@Nullable
public static StorageException toStorageException(Throwable exception) {
// TODO: switch to using rpcStatus when cl/408735437 is rolled out
// com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
Status grpcStatus = Status.fromThrowable(exception);
String message = exception.getMessage();
String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
Pattern streamPattern = Pattern.compile(streamPatternString);
if (message == null) {
return null;
}
// TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("input schema has more fields than bigquery schema")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new SchemaMismatchedException(entity, message, exception);
}
// TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("stream has been finalized and cannot be appended")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new StreamFinalizedException(entity, message, exception);
}
return null;
com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
return toStorageException(rpcStatus, exception);
}

private Exceptions() {}
@@ -590,18 +590,15 @@ private void doneCallback(Throwable finalStatus) {
+ " for stream "
+ streamName);
} else {
this.connectionFinalStatus = finalStatus;
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
this.connectionFinalStatus = storageException != null ? storageException : finalStatus;
log.info(
"Stream finished with error " + finalStatus.toString() + " for stream " + streamName);
}
}
} finally {
this.lock.unlock();
}
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
if (storageException != null) {
this.connectionFinalStatus = storageException;
}
}

@GuardedBy("lock")
@@ -343,64 +343,6 @@ public void testAppendFailedSchemaError() throws Exception {
writer.close();
}

@Test
public void testAppendFailedOnDone() throws Exception {
StreamWriter writer = getTestStreamWriter();

StatusRuntimeException exception =
new StatusRuntimeException(
io.grpc.Status.INVALID_ARGUMENT.withDescription(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(exception);

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertTrue(
actualError
.getMessage()
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

writer.close();
}

// TODO(stephwang): update test case to below when toStorageException is updated
// @Test
// public void testAppendFailedOnDone2() throws Exception {
// StreamWriter writer = getTestStreamWriter();
//
// StorageError storageError =
// StorageError.newBuilder()
// .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
// .setEntity("foobar")
// .build();
// com.google.rpc.Status statusProto =
// com.google.rpc.Status.newBuilder()
// .addDetails(Any.pack(storageError))
// .build();
//
// StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto);
//
// testBigQueryWrite.addResponse(createAppendResponse(0));
// testBigQueryWrite.addException(exception);
//
// ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
// ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
//
// assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
// Exceptions.SchemaMismatchedException actualError =
// assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
// assertEquals("foobar", actualError.getStreamName());
//
// writer.close();
// }

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
@@ -745,10 +745,9 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// TODO(stephwang): update test case when toStroageException is updated
assertEquals(Exceptions.SchemaMismatchedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema");
.contains("Schema mismatch due to extra fields in user schema");
}
}
}
@@ -777,10 +776,8 @@ public void testStreamFinalizedError()
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// //TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended");
assertEquals(Exceptions.StreamFinalizedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage()).contains("Stream is finalized");
}
}
}

0 comments on commit 6243ad5

Please sign in to comment.