Skip to content

Commit

Permalink
chore: fix breaking change to continue supporting users who may be (#…
Browse files Browse the repository at this point in the history
…1822)

consuming errors as StatusRuntimeException
  • Loading branch information
agrawal-siddharth committed Oct 4, 2022
1 parent 0d9baba commit 4096d41
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 23 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -60,4 +60,9 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$AppendSerializtionError</className>
<method>Exceptions$AppendSerializtionError(java.lang.String, java.util.Map)</method>
</difference>
</differences>
Expand Up @@ -643,7 +643,11 @@ private void requestCallback(AppendRowsResponse response) {
rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage());
}
AppendSerializtionError exception =
new AppendSerializtionError(streamName, rowIndexToErrorMessage);
new AppendSerializtionError(
response.getError().getCode(),
response.getError().getMessage(),
streamName,
rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
Expand Down
Expand Up @@ -217,15 +217,20 @@ public static StorageException toStorageException(Throwable exception) {

/**
* This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to
* Proto serializtion fails. The exception contains a Map of indexes of faulty lines and the
* corresponding error message.
* Proto serializtion fails. It can also be thrown by the server in case rows contains invalid
* data. The exception contains a Map of indexes of faulty rows and the corresponding error
* message.
*/
public static class AppendSerializtionError extends RuntimeException {
public static class AppendSerializtionError extends StatusRuntimeException {
private final Map<Integer, String> rowIndexToErrorMessage;
private final String streamName;

public AppendSerializtionError(String streamName, Map<Integer, String> rowIndexToErrorMessage) {
super(String.format("Append serialization failed for writer: %s", streamName));
public AppendSerializtionError(
int codeValue,
String description,
String streamName,
Map<Integer, String> rowIndexToErrorMessage) {
super(Status.fromCodeValue(codeValue).withDescription(description));
this.rowIndexToErrorMessage = rowIndexToErrorMessage;
this.streamName = streamName;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import com.google.rpc.Code;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -221,7 +222,11 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
}

if (!rowIndexToErrorMessage.isEmpty()) {
throw new AppendSerializtionError(streamName, rowIndexToErrorMessage);
throw new AppendSerializtionError(
Code.INVALID_ARGUMENT.getNumber(),
"Append serialization failed for writer: " + streamName,
streamName,
rowIndexToErrorMessage);
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
Expand Down
Expand Up @@ -74,6 +74,18 @@ public class ITBigQueryWriteManualClientTest {
private static String tableIdEU;
private static BigQuery bigquery;

public class StringWithSecondsNanos {
public String foo;
public long seconds;
public int nanos;

public StringWithSecondsNanos(String fooParam, long secondsParam, int nanosParam) {
foo = fooParam;
seconds = secondsParam;
nanos = nanosParam;
}
}

@BeforeClass
public static void beforeClass() throws IOException {
client = BigQueryWriteClient.create();
Expand Down Expand Up @@ -187,6 +199,23 @@ ProtoRows CreateProtoRowsComplex(String[] messages) {
return rows.build();
}

ProtoRows CreateProtoRowsMixed(StringWithSecondsNanos[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (StringWithSecondsNanos message : messages) {
FooTimestampType datum =
FooTimestampType.newBuilder()
.setFoo(message.foo)
.setBar(
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(message.seconds)
.setNanos(message.nanos)
.build())
.build();
rows.addSerializedRows(datum.toByteString());
}
return rows.build();
}

@Test
public void testBatchWriteWithCommittedStreamEU()
throws IOException, InterruptedException, ExecutionException {
Expand Down Expand Up @@ -318,7 +347,7 @@ public void testJsonStreamWriterCommittedStream()
public void testRowErrors()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "_default";
String tableName = "TestBadRowsTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
Expand All @@ -327,25 +356,26 @@ public void testRowErrors()
com.google.cloud.bigquery.Field.newBuilder(
"foo", StandardSQLTypeName.STRING)
.setMaxLength(10L)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"bar", StandardSQLTypeName.TIMESTAMP)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(ProtoSchemaConverter.convert(FooTimestampType.getDescriptor()))
.build();

LOG.info("Sending three messages");
StringWithSecondsNanos[] myBadList = {
new StringWithSecondsNanos("aaabbbcccddd", 1663821424, 0),
new StringWithSecondsNanos("bbb", Long.MIN_VALUE, 0),
new StringWithSecondsNanos("cccdddeeefffggg", 1663621424, 0)
};
ApiFuture<AppendRowsResponse> futureResponse =
streamWriter.append(
CreateProtoRows(new String[] {"aaabbbcccddd", "bbb", "cccdddeeefffggg"}), -1);
streamWriter.append(CreateProtoRowsMixed(myBadList), -1);
AppendRowsResponse actualResponse = null;
try {
actualResponse = futureResponse.get();
Expand All @@ -358,6 +388,9 @@ public void testRowErrors()
assertEquals(
"Field foo: STRING(10) has maximum length 10 but got a value with length 12 on field foo.",
e.getRowIndexToErrorMessage().get(0));
assertEquals(
"Timestamp field value is out of range: -9223372036854775808 on field bar.",
e.getRowIndexToErrorMessage().get(1));
assertEquals(
"Field foo: STRING(10) has maximum length 10 but got a value with length 15 on field foo.",
e.getRowIndexToErrorMessage().get(2));
Expand All @@ -366,18 +399,37 @@ public void testRowErrors()
}
}
assertEquals(null, actualResponse);

LOG.info("Resending with three good messages");
StringWithSecondsNanos[] myGoodList = {
new StringWithSecondsNanos("aaa", 1664821424, 0),
new StringWithSecondsNanos("bbb", 1663821424, 0),
new StringWithSecondsNanos("ccc", 1664801424, 0)
};
ApiFuture<AppendRowsResponse> futureResponse1 =
streamWriter.append(CreateProtoRows(new String[] {"aaa", "bbb", "ccc"}), -1);
assertEquals(3, futureResponse1.get().getAppendResult().getOffset().getValue());
streamWriter.append(CreateProtoRowsMixed(myGoodList), -1);
assertEquals(0, futureResponse1.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iterDump = result.getValues().iterator();
while (iterDump.hasNext()) {
FieldValueList currentRow = iterDump.next();
LOG.info("Table row contains " + currentRow.size() + " field values.");
LOG.info("Table column has foo: " + currentRow.get(0).getStringValue());
LOG.info("Table column has bar: " + currentRow.get(1).getTimestampValue());
}

Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals(1664821424000000L, currentRow.get(1).getTimestampValue());
currentRow = iter.next();
assertEquals("bbb", currentRow.get(0).getStringValue());
assertEquals(1663821424000000L, currentRow.get(1).getTimestampValue());
currentRow = iter.next();
assertEquals("ccc", currentRow.get(0).getStringValue());
assertEquals(1664801424000000L, currentRow.get(1).getTimestampValue());
assertEquals(false, iter.hasNext());
}

Expand Down
6 changes: 6 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/test.proto
Expand Up @@ -18,6 +18,7 @@ syntax = "proto2";
package com.google.cloud.bigquery.storage.test;

import "google/cloud/bigquery/storage/v1/annotations.proto";
import "google/protobuf/timestamp.proto";

enum TestEnum {
TestEnum0 = 0;
Expand Down Expand Up @@ -76,6 +77,11 @@ message UpdatedFooType2 {
optional string baz = 3;
}

message FooTimestampType {
optional string foo = 1;
optional .google.protobuf.Timestamp bar = 2;
}

message DuplicateType {
optional TestEnum f1 = 1;
optional TestEnum f2 = 2;
Expand Down

0 comments on commit 4096d41

Please sign in to comment.