Skip to content

Commit

Permalink
fix: Add missing field error to row error message (#1752)
Browse files Browse the repository at this point in the history
* fix: Add missing field error to row error message

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Aug 17, 2022
1 parent 626752b commit 186d213
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 26 deletions.
Expand Up @@ -225,7 +225,7 @@ public static class AppendSerializtionError extends RuntimeException {
private final String streamName;

public AppendSerializtionError(String streamName, Map<Integer, String> rowIndexToErrorMessage) {
super(String.format("Append serializtion failed for writer: %s", streamName));
super(String.format("Append serialization failed for writer: %s", streamName));
this.rowIndexToErrorMessage = rowIndexToErrorMessage;
this.streamName = streamName;
}
Expand All @@ -239,6 +239,35 @@ public String getStreamName() {
}
}

/** This exception is used internally to handle field level parsing errors. */
public static class FieldParseError extends IllegalArgumentException {
private final String fieldName;
private final String bqType;
private final Throwable cause;

protected FieldParseError(String fieldName, String bqType, Throwable cause) {
this.fieldName = fieldName;
this.bqType = bqType;
this.cause = cause;
}

public String getFieldName() {
return fieldName;
}

public String getBqType() {
return bqType;
}

public Throwable getCause() {
return cause;
}

public String getMessage() {
return cause.getMessage();
}
}

/**
* This writer instance has either been closed by the user explicitly, or has encountered
* non-retriable errors.
Expand Down
Expand Up @@ -155,7 +155,19 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException exception) {
rowIndexToErrorMessage.put(i, exception.getMessage());
if (exception instanceof Exceptions.FieldParseError) {
Exceptions.FieldParseError ex = (Exceptions.FieldParseError) exception;
rowIndexToErrorMessage.put(
i,
"Field "
+ ex.getFieldName()
+ " failed to convert to "
+ ex.getBqType()
+ ". Error: "
+ ex.getCause().getMessage());
} else {
rowIndexToErrorMessage.put(i, exception.getMessage());
}
}
}

Expand Down
Expand Up @@ -221,11 +221,24 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
+ ")");
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
} else {
fillRepeatedField(
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
try {
if (!field.isRepeated()) {
fillField(
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
} else {
fillRepeatedField(
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
}
} catch (Exceptions.FieldParseError ex) {
throw ex;
} catch (Exception ex) {
// This function is recursively called, so this throw will be caught and throw directly out
// by the catch
// above.
throw new Exceptions.FieldParseError(
currentScope,
fieldSchema != null ? fieldSchema.getType().name() : field.getType().name(),
ex);
}
}

Expand Down Expand Up @@ -329,26 +342,17 @@ private static void fillField(
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ j
+ "] could not be converted to byte[]."));
}
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: " + currentScope + "[" + j + "] could not be converted to byte[]."));
}
protoMsg.setField(fieldDescriptor, bytes);
return;
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format("Error: " + currentScope + "could not be converted to byte[]."));
}
protoMsg.setField(fieldDescriptor, bytes);
return;
}
break;
case INT64:
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.SchemaTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
Expand Down Expand Up @@ -639,7 +640,46 @@ public void testMultipleAppendSerializtionErrors()
"JSONObject has fields unknown to BigQuery: root.not_foo.",
rowIndexToErrorMessage.get(0));
assertEquals(
"JSONObject does not have a string field at root.foo.", rowIndexToErrorMessage.get(2));
"Field root.foo failed to convert to STRING. Error: JSONObject does not have a string field at root.foo.",
rowIndexToErrorMessage.get(2));
}
}
}

@Test
public void testBadStringToNumericRowError()
throws DescriptorValidationException, IOException, InterruptedException {
TableSchema TABLE_SCHEMA =
TableSchema.newBuilder()
.addFields(
0,
TableFieldSchema.newBuilder()
.setName("test_field_type")
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.NULLABLE)
.build())
.build();
SchemaTest.StringType expectedProto =
SchemaTest.StringType.newBuilder().setTestFieldType("allen").build();
JSONObject foo = new JSONObject();
// put a field which is not part of the expected schema
foo.put("test_field_type", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected AppendSerializtionError");
} catch (AppendSerializtionError appendSerializtionError) {
Map<Integer, String> rowIndexToErrorMessage =
appendSerializtionError.getRowIndexToErrorMessage();
assertEquals(1, rowIndexToErrorMessage.size());
assertTrue(
rowIndexToErrorMessage
.get(0)
.startsWith("Field root.test_field_type failed to convert to NUMERIC. Error:"));
}
}
}
Expand Down
Expand Up @@ -1226,4 +1226,53 @@ public void testJsonAllFieldsNullValue() throws Exception {
JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}

@Test
public void testBadJsonFieldRepeated() throws Exception {
TableSchema ts =
TableSchema.newBuilder()
.addFields(
0,
TableFieldSchema.newBuilder()
.setName("test_repeated")
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.build())
.build();
JSONObject json = new JSONObject();
json.put("test_repeated", new JSONArray(new String[] {"123", "blah"}));

try {
DynamicMessage protoMsg =
JsonToProtoMessage.convertJsonToProtoMessage(RepeatedBytes.getDescriptor(), ts, json);
Assert.fail("Should fail");
} catch (Exceptions.FieldParseError ex) {
assertEquals(ex.getBqType(), "NUMERIC");
assertEquals(ex.getFieldName(), "root.test_repeated");
}
}

@Test
public void testBadJsonFieldIntRepeated() throws Exception {
TableSchema ts =
TableSchema.newBuilder()
.addFields(
0,
TableFieldSchema.newBuilder()
.setName("test_repeated")
.setType(TableFieldSchema.Type.DATE)
.setMode(TableFieldSchema.Mode.REPEATED)
.build())
.build();
JSONObject json = new JSONObject();
json.put("test_repeated", new JSONArray(new String[] {"blah"}));

try {
DynamicMessage protoMsg =
JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
Assert.fail("Should fail");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "Text 'blah' could not be parsed at index 0");
}
}
}

0 comments on commit 186d213

Please sign in to comment.