Skip to content

Commit

Permalink
fix: add string to DATETIME, TIME, NUMERIC, BIGNUMERIC support in Jso…
Browse files Browse the repository at this point in the history
…nStreamWriter v1 (#1345)

* fix: update code comment to reflect max size change

* fix: String to DATETIME, TIME, NUMERIC, BIGNUMERIC conversion in JsonWriter

* .

* .

* .
  • Loading branch information
yirutang committed Oct 4, 2021
1 parent 691f078 commit 9d272dd
Show file tree
Hide file tree
Showing 4 changed files with 498 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
builder.traceId);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
}

/**
Expand Down Expand Up @@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.pathtemplate.ValidationException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand All @@ -23,10 +24,14 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.UninitializedMessageException;
import java.math.BigDecimal;
import java.util.List;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.threeten.bp.LocalDateTime;
import org.threeten.bp.LocalTime;

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
Expand Down Expand Up @@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -71,7 +97,11 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
Descriptor protoSchema,
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
Expand All @@ -90,10 +120,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().equals(field.getName())) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
+ " has mismatch names ("
+ fieldSchema.getName()
+ ") ("
+ field.getName()
+ ")");
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
}
}

Expand All @@ -119,6 +164,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
*
* @param protoMsg The protocol buffer message being constructed
* @param fieldDescriptor
* @param fieldSchema
* @param json
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
Expand All @@ -127,6 +173,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand All @@ -144,6 +191,25 @@ private static void fillField(
}
break;
case BYTES:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
}
}
if (val instanceof ByteString) {
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
Expand All @@ -170,6 +236,29 @@ private static void fillField(
}
break;
case INT64:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
}
}
if (val instanceof Integer) {
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
return;
Expand Down Expand Up @@ -206,6 +295,7 @@ private static void fillField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
Expand All @@ -224,6 +314,7 @@ private static void fillField(
*
* @param protoMsg The protocol buffer message being constructed
* @param fieldDescriptor
* @param fieldSchema
* @param json If root level has no matching fields, throws exception.
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
Expand All @@ -232,6 +323,7 @@ private static void fillField(
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand Down Expand Up @@ -259,40 +351,81 @@ private static void fillRepeatedField(
}
break;
case BYTES:
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
Boolean added = false;
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
} else if (fieldSchema != null
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
}
if (!added) {
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
break;
case INT64:
if (val instanceof Integer) {
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (val instanceof Integer) {
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
Expand Down Expand Up @@ -330,6 +463,7 @@ private static void fillRepeatedField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
Expand All @@ -42,6 +43,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;
import org.threeten.bp.LocalTime;

@RunWith(JUnit4.class)
public class JsonStreamWriterTest {
Expand Down Expand Up @@ -195,6 +197,56 @@ public void testSingleAppendSimpleJson() throws Exception {
}
}

@Test
public void testSpecialTypeAppend() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setName("time")
.setType(TableFieldSchema.Type.TIME)
.setMode(TableFieldSchema.Mode.REPEATED)
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(field).build();

JsonTest.TestTime expectedProto =
JsonTest.TestTime.newBuilder()
.addTime(CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.of(1, 0, 1)))
.build();
JSONObject foo = new JSONObject();
foo.put("time", new JSONArray(new String[] {"01:00:01"}));
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
}
}

@Test
public void testSingleAppendMultipleSimpleJson() throws Exception {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
Expand Down
Loading

0 comments on commit 9d272dd

Please sign in to comment.