Skip to content

Commit

Permalink
fix: JsonWriter accepts string input for DATETIME, TIME, NUMERIC, BIG…
Browse files Browse the repository at this point in the history
…NUMERIC field (#1339)

* fix: update code comment to reflect max size change

* fix: JsonWriter support string DateTime, Time, Numeric, BigNumeric

* .

* .

* fix format

* remove a test that is covered by JsonToProtoMessageTest

* .

* .

* .

* remove v1 test that is failing due to test proto update, test coverage will be added when the additional type support is ported to v1

* .

* .

* .

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Oct 1, 2021
1 parent db2ca42 commit 691f078
Show file tree
Hide file tree
Showing 8 changed files with 624 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
builder.traceId);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
}

/**
Expand Down Expand Up @@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.pathtemplate.ValidationException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand All @@ -23,10 +24,14 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.UninitializedMessageException;
import java.math.BigDecimal;
import java.util.List;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.threeten.bp.LocalDateTime;
import org.threeten.bp.LocalTime;

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
Expand Down Expand Up @@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -71,9 +97,12 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
Descriptor protoSchema,
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
String[] jsonNames = JSONObject.getNames(json);
if (jsonNames == null) {
Expand All @@ -90,10 +119,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().equals(field.getName())) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
+ " has mismatch names ("
+ fieldSchema.getName()
+ ") ("
+ field.getName()
+ ")");
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
}
}

Expand Down Expand Up @@ -127,6 +171,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand All @@ -144,6 +189,25 @@ private static void fillField(
}
break;
case BYTES:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
}
}
if (val instanceof ByteString) {
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
Expand All @@ -170,6 +234,29 @@ private static void fillField(
}
break;
case INT64:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
}
}
if (val instanceof Integer) {
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
return;
Expand Down Expand Up @@ -206,6 +293,7 @@ private static void fillField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
Expand All @@ -232,6 +320,7 @@ private static void fillField(
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand Down Expand Up @@ -259,40 +348,81 @@ private static void fillRepeatedField(
}
break;
case BYTES:
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
Boolean added = false;
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
} else if (fieldSchema != null
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
}
if (!added) {
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
break;
case INT64:
if (val instanceof Integer) {
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (val instanceof Integer) {
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
Expand Down Expand Up @@ -330,6 +460,7 @@ private static void fillRepeatedField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ public void testStructComplex() throws Exception {
.setMode(TableFieldSchema.Mode.REQUIRED)
.setName("test_date")
.build();
final TableFieldSchema test_datetime =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_datetime")
.build();
final TableFieldSchema test_datetime_str =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_datetime_str")
.build();
final TableFieldSchema ComplexLvl2 =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRUCT)
Expand Down Expand Up @@ -203,12 +215,36 @@ public void testStructComplex() throws Exception {
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_time")
.build();
final TableFieldSchema TEST_TIME_STR =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.TIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_time_str")
.build();
final TableFieldSchema TEST_NUMERIC_REPEATED =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_numeric_repeated")
.build();
final TableFieldSchema TEST_NUMERIC_STR =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_numeric_str")
.build();
final TableFieldSchema TEST_BIGNUMERIC =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_bignumeric")
.build();
final TableFieldSchema TEST_BIGNUMERIC_STR =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_bignumeric_str")
.build();
final TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(0, test_int)
Expand All @@ -217,13 +253,19 @@ public void testStructComplex() throws Exception {
.addFields(3, test_bool)
.addFields(4, test_double)
.addFields(5, test_date)
.addFields(6, ComplexLvl1)
.addFields(7, ComplexLvl2)
.addFields(8, TEST_NUMERIC)
.addFields(9, TEST_GEO)
.addFields(10, TEST_TIMESTAMP)
.addFields(11, TEST_TIME)
.addFields(12, TEST_NUMERIC_REPEATED)
.addFields(6, test_datetime)
.addFields(7, test_datetime_str)
.addFields(8, ComplexLvl1)
.addFields(9, ComplexLvl2)
.addFields(10, TEST_NUMERIC)
.addFields(11, TEST_GEO)
.addFields(12, TEST_TIMESTAMP)
.addFields(13, TEST_TIME)
.addFields(14, TEST_TIME_STR)
.addFields(15, TEST_NUMERIC_REPEATED)
.addFields(16, TEST_NUMERIC_STR)
.addFields(17, TEST_BIGNUMERIC)
.addFields(18, TEST_BIGNUMERIC_STR)
.build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
Expand Down
Loading

0 comments on commit 691f078

Please sign in to comment.