Skip to content

Commit

Permalink
feat: remove IgnoreUnknownFields support on JsonStreamWriter (#757)
Browse files Browse the repository at this point in the history
* feat! Remove IgnoreUnknownFields support for JsonStreamWriter

* .

* .

* .

* .

* .

* .

* .

* .

* .

* add clirr-ignored-differences.xml for breaking changes

Co-authored-by: yirutang <yiru@google.com>
  • Loading branch information
stephaniewang526 and yirutang committed Dec 30, 2020
1 parent f3c897f commit 0988105
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 187 deletions.
26 changes: 26 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- TODO(stephwang): To be removed after the release -->
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter</className>
<differenceType>7005</differenceType>
<method>com.google.api.core.ApiFuture append(org.json.JSONArray, boolean)</method>
<from>com.google.api.core.ApiFuture append(org.json.JSONArray, boolean)</from>
<to>com.google.api.core.ApiFuture append(org.json.JSONArray, long)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter</className>
<differenceType>7004</differenceType>
<method>com.google.api.core.ApiFuture append(org.json.JSONArray, long, boolean)</method>
<from>com.google.api.core.ApiFuture append(org.json.JSONArray, long, boolean)</from>
<to>com.google.api.core.ApiFuture append(org.json.JSONArray)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessage</className>
<differenceType>7004</differenceType>
<method>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</method>
<from>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</from>
<to>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject)</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ private JsonStreamWriter(Builder builder)
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table.
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, boolean allowUnknownFields) {
return append(jsonArr, -1, allowUnknownFields);
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
return append(jsonArr, -1);
}

/**
Expand All @@ -109,20 +108,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, boolean allowUnkn
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table.
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(
JSONArray jsonArr, long offset, boolean allowUnknownFields) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json, allowUnknownFields);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@ public class JsonToProtoMessage {
*
* @param protoSchema
* @param json
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, JSONObject json, boolean allowUnknownFields)
public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, json, "root", /*topLevel=*/ true, allowUnknownFields);
return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -67,24 +64,18 @@ public static DynamicMessage convertJsonToProtoMessage(
* @param protoSchema
* @param json
* @param jsonScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @param topLevel checks if root level has any matching fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema,
JSONObject json,
String jsonScope,
boolean topLevel,
boolean allowUnknownFields)
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
String[] jsonNames = JSONObject.getNames(json);
if (jsonNames == null) {
return protoMsg.build();
}
int matchedFields = 0;
for (int i = 0; i < jsonNames.length; i++) {
String jsonName = jsonNames[i];
// We want lowercase here to support case-insensitive data writes.
Expand All @@ -93,27 +84,16 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
if (field == null) {
if (!allowUnknownFields) {
throw new IllegalArgumentException(
String.format(
"JSONObject has fields unknown to BigQuery: %s. Set allowUnknownFields to True to allow unknown fields.",
currentScope));
} else {
continue;
}
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
matchedFields++;
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope, allowUnknownFields);
fillField(protoMsg, field, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope, allowUnknownFields);
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
}
}

if (matchedFields == 0 && topLevel) {
throw new IllegalArgumentException(
"There are no matching fields found for the JSONObject and the protocol buffer descriptor.");
}
DynamicMessage msg;
try {
msg = protoMsg.build();
Expand All @@ -139,16 +119,14 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
* @param json
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
JSONObject json,
String exactJsonKeyName,
String currentScope,
boolean allowUnknownFields)
String currentScope)
throws IllegalArgumentException {

java.lang.Object val = json.get(exactJsonKeyName);
Expand Down Expand Up @@ -204,8 +182,7 @@ private static void fillField(
fieldDescriptor.getMessageType(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false,
allowUnknownFields));
/*topLevel =*/ false));
return;
}
break;
Expand All @@ -224,16 +201,14 @@ private static void fillField(
* @param json If root level has no matching fields, throws exception.
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
JSONObject json,
String exactJsonKeyName,
String currentScope,
boolean allowUnknownFields)
String currentScope)
throws IllegalArgumentException {

JSONArray jsonArray;
Expand Down Expand Up @@ -305,8 +280,7 @@ private static void fillRepeatedField(
fieldDescriptor.getMessageType(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false,
allowUnknownFields));
/*topLevel =*/ false));
} else {
fail = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ public void testFlushAllFailed() throws Exception {
.build();

testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ public void testSingleAppendSimpleJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
Expand Down Expand Up @@ -299,8 +298,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);

assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
Expand Down Expand Up @@ -357,7 +355,7 @@ public void testMultipleAppendSimpleJson() throws Exception {
.build());
ApiFuture<AppendRowsResponse> appendFuture;
for (int i = 0; i < 4; i++) {
appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false);
appendFuture = writer.append(jsonArr);
assertEquals((long) i, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
Expand Down Expand Up @@ -443,8 +441,7 @@ public void testSingleAppendComplexJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);

assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
Expand Down Expand Up @@ -495,8 +492,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);

int millis = 0;
while (millis <= 10000) {
Expand Down Expand Up @@ -532,8 +528,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(updatedJsonArr);

millis = 0;
while (millis <= 10000) {
Expand Down Expand Up @@ -570,8 +565,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray updatedJsonArr2 = new JSONArray();
updatedJsonArr2.put(updatedFoo2);

ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(updatedJsonArr2);

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
Expand Down Expand Up @@ -614,8 +608,7 @@ public void testAppendOutOfRangeException() throws Exception {
foo.put("foo", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
try {
appendFuture.get();
Assert.fail("expected ExecutionException");
Expand Down Expand Up @@ -644,8 +637,7 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
foo.put("foo", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
try {
appendFuture.get();
Assert.fail("expected ExecutionException");
Expand All @@ -668,8 +660,7 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(updatedJsonArr);
assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue());
appendFuture2.get();
assertEquals(
Expand Down Expand Up @@ -727,12 +718,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
Expand Down Expand Up @@ -796,8 +784,7 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(
Expand Down Expand Up @@ -857,8 +844,7 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down Expand Up @@ -940,8 +926,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down Expand Up @@ -1004,8 +989,7 @@ public void run() {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr2, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr2);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 0988105

Please sign in to comment.