Skip to content

Commit

Permalink
feat: Add ignoreUnknownField support in JsonWriter (#1455)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️
  • Loading branch information
yirutang committed Dec 29, 2021
1 parent 973afcc commit 4616adb
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class JsonStreamWriter implements AutoCloseable {
private StreamWriter.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;
private boolean ignoreUnknownFields = false;

/**
* Constructs the JsonStreamWriter
Expand Down Expand Up @@ -80,6 +81,7 @@ private JsonStreamWriter(Builder builder)
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
this.ignoreUnknownFields = builder.ignoreUnknownFields;
}

/**
Expand Down Expand Up @@ -135,7 +137,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down Expand Up @@ -249,6 +252,7 @@ public static final class Builder {
private String endpoint;
private boolean createDefaultStream = false;
private String traceId;
private boolean ignoreUnknownFields = false;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -350,6 +354,18 @@ public Builder setTraceId(String traceId) {
return this;
}

/**
* Setter for a ignoreUnkownFields, if true, unknown Json fields to BigQuery will be ignored
* instead of error out.
*
* @param ignoreUnknownFields
* @return Builder
*/
public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
this.ignoreUnknownFields = ignoreUnknownFields;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(
protoSchema, null, json, "root", /*topLevel=*/ true, false);
}

/**
Expand All @@ -85,7 +86,39 @@ public static DynamicMessage convertJsonToProtoMessage(
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
protoSchema,
tableSchema.getFieldsList(),
json,
"root",
/*topLevel=*/ true,
/*ignoreUnknownFields*/ false);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @param ignoreUnknownFields allows unknown fields in JSON input to be ignored.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema,
tableSchema.getFieldsList(),
json,
"root",
/*topLevel=*/ true,
ignoreUnknownFields);
}

/**
Expand All @@ -102,7 +135,8 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
boolean topLevel,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
Expand All @@ -117,9 +151,11 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String jsonLowercaseName = jsonName.toLowerCase();
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
if (field == null) {
if (field == null && !ignoreUnknownFields) {
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
} else if (field == null) {
continue;
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
Expand All @@ -137,9 +173,10 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
} else {
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
fillRepeatedField(
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
}
}

Expand Down Expand Up @@ -174,7 +211,8 @@ private static void fillField(
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
String currentScope,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

java.lang.Object val = json.get(exactJsonKeyName);
Expand Down Expand Up @@ -303,7 +341,8 @@ private static void fillField(
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
/*topLevel =*/ false,
ignoreUnknownFields));
return;
}
break;
Expand Down Expand Up @@ -331,7 +370,8 @@ private static void fillRepeatedField(
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
String currentScope,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

JSONArray jsonArray;
Expand Down Expand Up @@ -478,7 +518,8 @@ private static void fillRepeatedField(
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
/*topLevel =*/ false,
ignoreUnknownFields));
} else {
fail = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,7 @@ private void cleanupInflightRequests() {
} finally {
this.lock.unlock();
}
log.fine(
"Cleaning "
+ localQueue.size()
+ " inflight requests with error: "
+ finalStatus.toString());
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
while (!localQueue.isEmpty()) {
localQueue.pollFirst().appendResult.setException(finalStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,48 @@ public void testSimpleSchemaUpdate() throws Exception {
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
}
}

@Test
public void testWithoutIgnoreUnknownFields() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_unknown", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected ExecutionException");
} catch (Exception ex) {
assertEquals(
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
}
}
}

@Test
public void testWithIgnoreUnknownFields() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setIgnoreUnknownFields(true)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_unknown", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ public void testJsonStreamWriterWithDefaultStream()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) {
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test_str", "aaa");
Expand All @@ -365,6 +367,7 @@ public void testJsonStreamWriterWithDefaultStream()
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
.toByteArray()
}));
row1.put("unknown_field", "a");
row1.put(
"test_datetime",
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
Expand Down

0 comments on commit 4616adb

Please sign in to comment.