From acac9c04301594c0876c1a14610d0319ea5fda38 Mon Sep 17 00:00:00 2001 From: Artur Owczarek Date: Mon, 20 Mar 2023 11:07:46 +0100 Subject: [PATCH] feat: add schema aware stream writer --- .../cloud/bigquery/storage/v1/Exceptions.java | 14 +- .../bigquery/storage/v1/JsonStreamWriter.java | 269 ++------ .../storage/v1/JsonToProtoMessage.java | 37 +- .../storage/v1/SchemaAwareStreamWriter.java | 651 ++++++++++++++++++ .../bigquery/storage/v1/ToProtoConverter.java | 13 + .../storage/v1/JsonToProtoMessageTest.java | 94 +-- .../v1beta2/JsonToProtoMessageTest.java | 2 +- 7 files changed, 792 insertions(+), 288 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 4bc0d97e0f..86362ecf75 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -216,7 +216,7 @@ public static StorageException toStorageException(Throwable exception) { } /** - * This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to + * This exception is thrown from {@link SchemaAwareStreamWriter#append()} when the client side Json to * Proto serializtion fails. It can also be thrown by the server in case rows contains invalid * data. The exception contains a Map of indexes of faulty rows and the corresponding error * message. @@ -348,15 +348,15 @@ protected InflightBytesLimitExceededException(String writerId, long currentLimit } } /** - * Input Json data has unknown field to the schema of the JsonStreamWriter. User can either turn - * on IgnoreUnknownFields option on the JsonStreamWriter, or if they don't want the error to be - * ignored, they should recreate the JsonStreamWriter with the updated table schema. + * Input data object has unknown field to the schema of the CustomStreamWriter. User can either turn + * on IgnoreUnknownFields option on the CustomStreamWriter, or if they don't want the error to be + * ignored, they should recreate the CustomStreamWriter with the updated table schema. */ - public static final class JsonDataHasUnknownFieldException extends IllegalArgumentException { + public static final class DataHasUnknownFieldException extends IllegalArgumentException { private final String jsonFieldName; - protected JsonDataHasUnknownFieldException(String jsonFieldName) { - super(String.format("JSONObject has fields unknown to BigQuery: %s.", jsonFieldName)); + public DataHasUnknownFieldException(String jsonFieldName) { + super(String.format("Source object has fields unknown to BigQuery: %s.", jsonFieldName)); this.jsonFieldName = jsonFieldName; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index edf40c1e64..a31b49babf 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -20,22 +20,13 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.DescriptorValidationException; -import com.google.protobuf.Message; -import com.google.rpc.Code; import java.io.IOException; -import java.util.HashMap; import java.util.Map; -import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.annotation.Nullable; import org.json.JSONArray; -import org.json.JSONObject; /** * A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is @@ -46,24 +37,7 @@ * order of minutes). */ public class JsonStreamWriter implements AutoCloseable { - private static String streamPatternString = - "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; - private static Pattern streamPattern = Pattern.compile(streamPatternString); - private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName()); - private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L; - - private BigQueryWriteClient client; - private String streamName; - private StreamWriter streamWriter; - private StreamWriter.Builder streamWriterBuilder; - private Descriptor descriptor; - private TableSchema tableSchema; - private boolean ignoreUnknownFields = false; - private boolean reconnectAfter10M = false; - private long totalMessageSize = 0; - private long absTotal = 0; - private ProtoSchema protoSchema; - private boolean enableConnectionPool = false; + private SchemaAwareStreamWriter schemaAwareStreamWriter; /** * Constructs the JsonStreamWriter @@ -71,34 +45,28 @@ public class JsonStreamWriter implements AutoCloseable { * @param builder The Builder object for the JsonStreamWriter */ private JsonStreamWriter(Builder builder) - throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, + throws Descriptors.DescriptorValidationException, + IllegalArgumentException, + IOException, InterruptedException { - this.descriptor = - BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); - - if (builder.client == null) { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamName); - } else { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); - } - this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); - this.totalMessageSize = protoSchema.getSerializedSize(); - this.client = builder.client; - streamWriterBuilder.setWriterSchema(protoSchema); - setStreamWriterSettings( - builder.channelProvider, - builder.credentialsProvider, - builder.executorProvider, - builder.endpoint, - builder.flowControlSettings, - builder.traceId); - streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); - streamWriterBuilder.setLocation(builder.location); - this.streamWriter = streamWriterBuilder.build(); - this.streamName = builder.streamName; - this.tableSchema = builder.tableSchema; - this.ignoreUnknownFields = builder.ignoreUnknownFields; - this.reconnectAfter10M = builder.reconnectAfter10M; + ToProtoConverter converter = JsonToProtoMessage.INSTANCE; + + // Here we rewrite all the JsonStreamWriter properties to SchemaAwareStreamWriter + SchemaAwareStreamWriter build = SchemaAwareStreamWriter.newBuilder( + builder.getStreamName(), + builder.tableSchema, + JsonToProtoMessage.INSTANCE) + .setChannelProvider(builder.channelProvider) + .setCredentialsProvider(builder.credentialsProvider) + .setExecutorProvider(builder.executorProvider) + .setFlowControlSettings(builder.flowControlSettings) + .setEndpoint(builder.endpoint) + .setTraceId(builder.traceId) + .setIgnoreUnknownFields(builder.ignoreUnknownFields) + .setReconnectAfter10M(builder.reconnectAfter10M) + .setEnableConnectionPool(builder.enableConnectionPool) + .setLocation(builder.location) + .build(); } /** @@ -112,64 +80,10 @@ private JsonStreamWriter(Builder builder) * ApiFuture */ public ApiFuture append(JSONArray jsonArr) - throws IOException, DescriptorValidationException { - return append(jsonArr, -1); - } - - private void refreshWriter(TableSchema updatedSchema) - throws DescriptorValidationException, IOException { - Preconditions.checkNotNull(updatedSchema, "updatedSchema is null."); - LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName); - // Close the StreamWriterf - this.streamWriter.close(); - // Update JsonStreamWriter's TableSchema and Descriptor - this.tableSchema = updatedSchema; - this.descriptor = - BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); - this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); - this.totalMessageSize = protoSchema.getSerializedSize(); - // Create a new underlying StreamWriter with the updated TableSchema and Descriptor - this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); + throws IOException, Descriptors.DescriptorValidationException { + return this.schemaAwareStreamWriter.append(jsonArr); } - private Message buildMessage(JSONObject json) - throws InterruptedException, DescriptorValidationException, IOException { - try { - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } catch (Exceptions.JsonDataHasUnknownFieldException ex) { - // Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before - // trying to get the table schema to increase the chance of succeed. This is to avoid - // client's invalid datfa caused storm of GetWriteStream. - LOG.warning( - "Saw Json unknown field " - + ex.getFieldName() - + ", try to refresh the writer with updated schema, stream: " - + streamName); - GetWriteStreamRequest writeStreamRequest = - GetWriteStreamRequest.newBuilder() - .setName(this.streamName) - .setView(WriteStreamView.FULL) - .build(); - WriteStream writeStream = client.getWriteStream(writeStreamRequest); - refreshWriter(writeStream.getTableSchema()); - try { - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } catch (Exceptions.JsonDataHasUnknownFieldException exex) { - LOG.warning( - "First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName); - Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS); - writeStream = client.getWriteStream(writeStreamRequest); - // TODO(yiru): We should let TableSchema return a timestamp so that we can simply - // compare the timestamp to see if the table schema is the same. If it is the - // same, we don't need to go refresh the writer again. - refreshWriter(writeStream.getTableSchema()); - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } - } - } /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON * data to protobuf messages, then using StreamWriter's append() to write the data at the @@ -182,68 +96,19 @@ private Message buildMessage(JSONObject json) * ApiFuture */ public ApiFuture append(JSONArray jsonArr, long offset) - throws IOException, DescriptorValidationException { - // Handle schema updates in a Thread-safe way by locking down the operation - synchronized (this) { - // Create a new stream writer internally if a new updated schema is reported from backend. - if (this.streamWriter.getUpdatedSchema() != null) { - refreshWriter(this.streamWriter.getUpdatedSchema()); - } - - ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); - // Any error in convertJsonToProtoMessage will throw an - // IllegalArgumentException/IllegalStateException/NullPointerException. - // IllegalArgumentException will be collected into a Map of row indexes to error messages. - // After the conversion is finished an AppendSerializtionError exception that contains all the - // conversion errors will be thrown. - long currentRequestSize = 0; - Map rowIndexToErrorMessage = new HashMap<>(); - for (int i = 0; i < jsonArr.length(); i++) { - JSONObject json = jsonArr.getJSONObject(i); - try { - Message protoMessage = buildMessage(json); - rowsBuilder.addSerializedRows(protoMessage.toByteString()); - currentRequestSize += protoMessage.getSerializedSize(); - } catch (IllegalArgumentException exception) { - if (exception instanceof Exceptions.FieldParseError) { - Exceptions.FieldParseError ex = (Exceptions.FieldParseError) exception; - rowIndexToErrorMessage.put( - i, - "Field " - + ex.getFieldName() - + " failed to convert to " - + ex.getBqType() - + ". Error: " - + ex.getCause().getMessage()); - } else { - rowIndexToErrorMessage.put(i, exception.getMessage()); - } - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - - if (!rowIndexToErrorMessage.isEmpty()) { - throw new AppendSerializtionError( - Code.INVALID_ARGUMENT.getNumber(), - "Append serialization failed for writer: " + streamName, - streamName, - rowIndexToErrorMessage); - } - final ApiFuture appendResponseFuture = - this.streamWriter.append(rowsBuilder.build(), offset); - return appendResponseFuture; - } + throws IOException, Descriptors.DescriptorValidationException { + return this.schemaAwareStreamWriter.append(jsonArr, offset); } - /** @return The name of the write stream associated with this writer. */ public String getStreamName() { - return this.streamName; + return this.schemaAwareStreamWriter.getStreamName(); } - /** @return A unique Id for this writer. */ + /** + * @return A unique Id for this writer. + */ public String getWriterId() { - return streamWriter.getWriterId(); + return this.schemaAwareStreamWriter.getWriterId(); } /** @@ -251,8 +116,8 @@ public String getWriterId() { * * @return Descriptor */ - public Descriptor getDescriptor() { - return this.descriptor; + public Descriptors.Descriptor getDescriptor() { + return this.schemaAwareStreamWriter.getDescriptor(); } /** @@ -261,7 +126,7 @@ public Descriptor getDescriptor() { * @return Descriptor */ public String getLocation() { - return this.streamWriter.getLocation(); + return this.schemaAwareStreamWriter.getLocation(); } /** @@ -272,7 +137,7 @@ public String getLocation() { * the throughput in exclusive stream case, or create a new Writer in the default stream case. */ public long getInflightWaitSeconds() { - return streamWriter.getInflightWaitSeconds(); + return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } /** @@ -284,54 +149,15 @@ public long getInflightWaitSeconds() { */ public void setMissingValueInterpretationMap( Map missingValueInterpretationMap) { - streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); + this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); } - /** @return the missing value interpretation map used for the writer. */ + /** + * @return the missing value interpretation map used for the writer. + */ public Map getMissingValueInterpretationMap() { - return streamWriter.getMissingValueInterpretationMap(); - } - - /** Sets all StreamWriter settings. */ - private void setStreamWriterSettings( - @Nullable TransportChannelProvider channelProvider, - @Nullable CredentialsProvider credentialsProvider, - @Nullable ExecutorProvider executorProvider, - @Nullable String endpoint, - @Nullable FlowControlSettings flowControlSettings, - @Nullable String traceId) { - if (channelProvider != null) { - streamWriterBuilder.setChannelProvider(channelProvider); - } - if (credentialsProvider != null) { - streamWriterBuilder.setCredentialsProvider(credentialsProvider); - } - if (executorProvider != null) { - streamWriterBuilder.setExecutorProvider(executorProvider); - } - if (endpoint != null) { - streamWriterBuilder.setEndpoint(endpoint); - } - if (traceId != null) { - streamWriterBuilder.setTraceId("JsonWriter_" + traceId); - } else { - streamWriterBuilder.setTraceId("JsonWriter:null"); - } - if (flowControlSettings != null) { - if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { - streamWriterBuilder.setMaxInflightBytes( - flowControlSettings.getMaxOutstandingRequestBytes()); - } - if (flowControlSettings.getMaxOutstandingElementCount() != null) { - streamWriterBuilder.setMaxInflightRequests( - flowControlSettings.getMaxOutstandingElementCount()); - } - if (flowControlSettings.getLimitExceededBehavior() != null) { - streamWriterBuilder.setLimitExceededBehavior( - flowControlSettings.getLimitExceededBehavior()); - } - } + return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); } /** @@ -394,10 +220,9 @@ public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient c return new Builder(streamOrTableName, null, client); } - /** Closes the underlying StreamWriter. */ @Override public void close() { - this.streamWriter.close(); + this.schemaAwareStreamWriter.close(); } /** @@ -406,12 +231,14 @@ public void close() { * connection pool is not used. Client should recreate JsonStreamWriter in this case. */ public boolean isClosed() { - return this.streamWriter.isClosed(); + return this.schemaAwareStreamWriter.isClosed(); } - /** @return if user explicitly closed the writer. */ + /** + * @return if user explicitly closed the writer. + */ public boolean isUserClosed() { - return this.streamWriter.isUserClosed(); + return this.schemaAwareStreamWriter.isUserClosed(); } public static final class Builder { @@ -618,7 +445,9 @@ public Builder setLocation(String location) { * @return JsonStreamWriter */ public JsonStreamWriter build() - throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, + throws Descriptors.DescriptorValidationException, + IllegalArgumentException, + IOException, InterruptedException { return new JsonStreamWriter(this); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index c402d66f54..e653b5a7e3 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -48,7 +48,8 @@ * Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf * descriptor must have all fields lowercased. */ -public class JsonToProtoMessage { +public final class JsonToProtoMessage implements ToProtoConverter { + public static final JsonToProtoMessage INSTANCE = new JsonToProtoMessage(); private static final Logger LOG = Logger.getLogger(JsonToProtoMessage.class.getName()); private static int NUMERIC_SCALE = 9; private static ImmutableMap FieldTypeToDebugMessage = @@ -102,6 +103,16 @@ public class JsonToProtoMessage { .toFormatter() .withZone(ZoneOffset.UTC); + private JsonToProtoMessage() { + } + + // TODO documentation + @Override + public DynamicMessage convertToProtoMessage( + Descriptor protoSchema, TableSchema tableSchema, Object json, boolean ignoreUnknownFields) { + return convertToProtoMessage(protoSchema, tableSchema, (JSONObject) json, ignoreUnknownFields); + } + /** * Converts Json data to protocol buffer messages given the protocol buffer descriptor. * @@ -109,13 +120,13 @@ public class JsonToProtoMessage { * @param json * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, JSONObject json) + public DynamicMessage convertToProtoMessage(Descriptor protoSchema, JSONObject json) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( + return convertToProtoMessage( protoSchema, null, json, "root", /*topLevel=*/ true, false); } @@ -128,7 +139,7 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J * @param json * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage( + public DynamicMessage convertToProtoMessage( Descriptor protoSchema, TableSchema tableSchema, JSONObject json) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); @@ -136,7 +147,7 @@ public static DynamicMessage convertJsonToProtoMessage( Preconditions.checkNotNull(tableSchema, "TableSchema is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( + return convertToProtoMessage( protoSchema, tableSchema.getFieldsList(), json, @@ -155,7 +166,7 @@ public static DynamicMessage convertJsonToProtoMessage( * @param ignoreUnknownFields allows unknown fields in JSON input to be ignored. * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage( + public DynamicMessage convertToProtoMessage( Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); @@ -163,7 +174,7 @@ public static DynamicMessage convertJsonToProtoMessage( Preconditions.checkNotNull(tableSchema, "TableSchema is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( + return convertToProtoMessage( protoSchema, tableSchema.getFieldsList(), json, @@ -181,7 +192,7 @@ public static DynamicMessage convertJsonToProtoMessage( * @param topLevel checks if root level has any matching fields. * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static DynamicMessage convertJsonToProtoMessageImpl( + private DynamicMessage convertToProtoMessage( Descriptor protoSchema, List tableSchema, JSONObject json, @@ -209,7 +220,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl( String currentScope = jsonScope + "." + jsonName; FieldDescriptor field = protoSchema.findFieldByName(jsonFieldLocator); if (field == null && !ignoreUnknownFields) { - throw new Exceptions.JsonDataHasUnknownFieldException(currentScope); + throw new Exceptions.DataHasUnknownFieldException(currentScope); } else if (field == null) { continue; } @@ -274,7 +285,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl( * @param currentScope Debugging purposes * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static void fillField( + private void fillField( DynamicMessage.Builder protoMsg, FieldDescriptor fieldDescriptor, TableFieldSchema fieldSchema, @@ -482,7 +493,7 @@ private static void fillField( Message.Builder message = protoMsg.newBuilderForField(fieldDescriptor); protoMsg.setField( fieldDescriptor, - convertJsonToProtoMessageImpl( + convertToProtoMessage( fieldDescriptor.getMessageType(), fieldSchema == null ? null : fieldSchema.getFieldsList(), json.getJSONObject(exactJsonKeyName), @@ -510,7 +521,7 @@ private static void fillField( * @param currentScope Debugging purposes * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static void fillRepeatedField( + private void fillRepeatedField( DynamicMessage.Builder protoMsg, FieldDescriptor fieldDescriptor, TableFieldSchema fieldSchema, @@ -747,7 +758,7 @@ private static void fillRepeatedField( Message.Builder message = protoMsg.newBuilderForField(fieldDescriptor); protoMsg.addRepeatedField( fieldDescriptor, - convertJsonToProtoMessageImpl( + convertToProtoMessage( fieldDescriptor.getMessageType(), fieldSchema == null ? null : fieldSchema.getFieldsList(), jsonArray.getJSONObject(i), diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java new file mode 100644 index 0000000000..be8120d0e8 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -0,0 +1,651 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; +import com.google.common.base.Preconditions; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Message; +import com.google.rpc.Code; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +/** + * A StreamWriter that can write data to BigQuery tables. The SchemaAwareStreamWriter is built on top + * of a StreamWriter, and it simply converts all data to protobuf messages using provided converter then + * calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter + * functions, but also provides an additional feature: schema update support, where if the BigQuery + * table schema is updated, users will be able to ingest data on the new schema after some time (in + * order of minutes). + */ +public class SchemaAwareStreamWriter implements AutoCloseable { + private static String streamPatternString = + "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; + private static Pattern streamPattern = Pattern.compile(streamPatternString); + private static final Logger LOG = Logger.getLogger(SchemaAwareStreamWriter.class.getName()); + private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L; + + private BigQueryWriteClient client; + private String streamName; + private StreamWriter streamWriter; + private StreamWriter.Builder streamWriterBuilder; + private Descriptor descriptor; + private TableSchema tableSchema; + private boolean ignoreUnknownFields = false; + private boolean reconnectAfter10M = false; + private long totalMessageSize = 0; + private long absTotal = 0; + private ProtoSchema protoSchema; + private boolean enableConnectionPool = false; + private ToProtoConverter toProtoConverter; + + /** + * Constructs the SchemaAwareStreamWriter + * + * @param builder The Builder object for the SchemaAwareStreamWriter + */ + private SchemaAwareStreamWriter(Builder builder) + throws DescriptorValidationException, + IllegalArgumentException, + IOException, + InterruptedException { + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); + + if (builder.client == null) { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName); + } else { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); + } + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); + this.client = builder.client; + streamWriterBuilder.setWriterSchema(protoSchema); + setStreamWriterSettings( + builder.channelProvider, + builder.credentialsProvider, + builder.executorProvider, + builder.endpoint, + builder.flowControlSettings, + builder.traceId); + streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); + streamWriterBuilder.setLocation(builder.location); + this.streamWriter = streamWriterBuilder.build(); + this.streamName = builder.streamName; + this.tableSchema = builder.tableSchema; + this.toProtoConverter = builder.toProtoConverter; + this.ignoreUnknownFields = builder.ignoreUnknownFields; + this.reconnectAfter10M = builder.reconnectAfter10M; + } + + /** + * Writes a collection that contains objects to the BigQuery table by first converting the data + * to protobuf messages, then using StreamWriter's append() to write the data at current end + * of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter + * is created with the updated TableSchema. + * + * @param items The array that contains objects to be written + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append(Iterable items) + throws IOException, DescriptorValidationException { + return append(items, -1); + } + + private void refreshWriter(TableSchema updatedSchema) + throws DescriptorValidationException, IOException { + Preconditions.checkNotNull(updatedSchema, "updatedSchema is null."); + LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName); + // Close the StreamWriterf + this.streamWriter.close(); + // Update SchemaAwareStreamWriter's TableSchema and Descriptor + this.tableSchema = updatedSchema; + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); + // Create a new underlying StreamWriter with the updated TableSchema and Descriptor + this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); + } + + private Message buildMessage(T item) + throws InterruptedException, DescriptorValidationException, IOException { + try { + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } catch (Exceptions.DataHasUnknownFieldException ex) { + // Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before + // trying to get the table schema to increase the chance of succeed. This is to avoid + // client's invalid datfa caused storm of GetWriteStream. + LOG.warning( + "Saw unknown field " + + ex.getFieldName() + + ", try to refresh the writer with updated schema, stream: " + + streamName); + GetWriteStreamRequest writeStreamRequest = + GetWriteStreamRequest.newBuilder() + .setName(this.streamName) + .setView(WriteStreamView.FULL) + .build(); + WriteStream writeStream = client.getWriteStream(writeStreamRequest); + refreshWriter(writeStream.getTableSchema()); + try { + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } catch (Exceptions.DataHasUnknownFieldException exex) { + LOG.warning( + "First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName); + Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS); + writeStream = client.getWriteStream(writeStreamRequest); + // TODO(yiru): We should let TableSchema return a timestamp so that we can simply + // compare the timestamp to see if the table schema is the same. If it is the + // same, we don't need to go refresh the writer again. + refreshWriter(writeStream.getTableSchema()); + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } + } + } + /** + * Writes a collection that contains objects to the BigQuery table by first converting the + * data to protobuf messages, then using StreamWriter's append() to write the data at the + * specified offset. If there is a schema update, the current StreamWriter is closed. A new + * StreamWriter is created with the updated TableSchema. + * + * @param items The collection that contains objects to be written + * @param offset Offset for deduplication + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append(Iterable items, long offset) + throws IOException, DescriptorValidationException { + // Handle schema updates in a Thread-safe way by locking down the operation + synchronized (this) { + // Create a new stream writer internally if a new updated schema is reported from backend. + if (this.streamWriter.getUpdatedSchema() != null) { + refreshWriter(this.streamWriter.getUpdatedSchema()); + } + + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + // Any error in convertToProtoMessage will throw an + // IllegalArgumentException/IllegalStateException/NullPointerException. + // IllegalArgumentException will be collected into a Map of row indexes to error messages. + // After the conversion is finished an AppendSerializtionError exception that contains all the + // conversion errors will be thrown. + long currentRequestSize = 0; + Map rowIndexToErrorMessage = new HashMap<>(); + int i = -1; + for (T item : items) { + i += 1; + try { + Message protoMessage = buildMessage(item); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); + currentRequestSize += protoMessage.getSerializedSize(); + } catch (IllegalArgumentException exception) { + if (exception instanceof Exceptions.FieldParseError) { + Exceptions.FieldParseError ex = (Exceptions.FieldParseError) exception; + rowIndexToErrorMessage.put( + i, + "Field " + + ex.getFieldName() + + " failed to convert to " + + ex.getBqType() + + ". Error: " + + ex.getCause().getMessage()); + } else { + rowIndexToErrorMessage.put(i, exception.getMessage()); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + if (!rowIndexToErrorMessage.isEmpty()) { + throw new AppendSerializtionError( + Code.INVALID_ARGUMENT.getNumber(), + "Append serialization failed for writer: " + streamName, + streamName, + rowIndexToErrorMessage); + } + return this.streamWriter.append(rowsBuilder.build(), offset); + } + } + + /** + * @return The name of the write stream associated with this writer. + */ + public String getStreamName() { + return this.streamName; + } + + /** + * @return A unique Id for this writer. + */ + public String getWriterId() { + return streamWriter.getWriterId(); + } + + /** + * Gets current descriptor + * + * @return Descriptor + */ + public Descriptor getDescriptor() { + return this.descriptor; + } + + /** + * Gets the location of the destination + * + * @return Descriptor + */ + public String getLocation() { + return this.streamWriter.getLocation(); + } + + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflight request limit (adjustable when + * constructing the Writer). The value is the wait time for the last sent request. A constant high + * wait value indicates a need for more throughput, you can create a new Stream for to increase + * the throughput in exclusive stream case, or create a new Writer in the default stream case. + */ + public long getInflightWaitSeconds() { + return streamWriter.getInflightWaitSeconds(); + } + + /** + * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * SchemaAwareStreamWriter. + */ + public void setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); + } + + /** + * @return the missing value interpretation map used for the writer. + */ + public Map + getMissingValueInterpretationMap() { + return streamWriter.getMissingValueInterpretationMap(); + } + + /** Sets all StreamWriter settings. */ + private void setStreamWriterSettings( + @Nullable TransportChannelProvider channelProvider, + @Nullable CredentialsProvider credentialsProvider, + @Nullable ExecutorProvider executorProvider, + @Nullable String endpoint, + @Nullable FlowControlSettings flowControlSettings, + @Nullable String traceId) { + if (channelProvider != null) { + streamWriterBuilder.setChannelProvider(channelProvider); + } + if (credentialsProvider != null) { + streamWriterBuilder.setCredentialsProvider(credentialsProvider); + } + if (executorProvider != null) { + streamWriterBuilder.setExecutorProvider(executorProvider); + } + if (endpoint != null) { + streamWriterBuilder.setEndpoint(endpoint); + } + if (traceId != null) { + streamWriterBuilder.setTraceId("SchemaAwareStreamWriter_" + traceId); + } else { + streamWriterBuilder.setTraceId("SchemaAwareStreamWriter:null"); + } + if (flowControlSettings != null) { + if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { + streamWriterBuilder.setMaxInflightBytes( + flowControlSettings.getMaxOutstandingRequestBytes()); + } + if (flowControlSettings.getMaxOutstandingElementCount() != null) { + streamWriterBuilder.setMaxInflightRequests( + flowControlSettings.getMaxOutstandingElementCount()); + } + if (flowControlSettings.getLimitExceededBehavior() != null) { + streamWriterBuilder.setLimitExceededBehavior( + flowControlSettings.getLimitExceededBehavior()); + } + } + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder with BigQuery client being initialized by + * StreamWriter by default. + * + *

The table schema passed in will be updated automatically when there is a schema update + * event. When used for Writer creation, it should be the latest schema. So when you are trying to + * reuse a stream, you should use Builder newBuilder( String streamOrTableName, + * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or table name + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, TableSchema tableSchema, ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder(streamOrTableName, tableSchema, null, toProtoConverter); + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder. + * + *

The table schema passed in will be updated automatically when there is a schema update + * event. When used for Writer creation, it should be the latest schema. So when you are trying to + * reuse a stream, you should use Builder newBuilder( String streamOrTableName, + * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @param client + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, + TableSchema tableSchema, + BigQueryWriteClient client, + ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkNotNull(client, "BigQuery client is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder<>(streamOrTableName, tableSchema, client, toProtoConverter); + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder with TableSchema being initialized by + * StreamWriter by default. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param client BigQueryWriteClient + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, BigQueryWriteClient client, ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(client, "BigQuery client is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder<>(streamOrTableName, null, client, toProtoConverter); + } + + /** Closes the underlying StreamWriter. */ + @Override + public void close() { + this.streamWriter.close(); + } + + /** + * @return if a writer can no longer be used for writing. It is due to either the + * SchemaAwareStreamWriter is explicitly closed or the underlying connection is broken when + * connection pool is not used. Client should recreate SchemaAwareStreamWriter in this case. + */ + public boolean isClosed() { + return this.streamWriter.isClosed(); + } + + /** + * @return if user explicitly closed the writer. + */ + public boolean isUserClosed() { + return this.streamWriter.isUserClosed(); + } + + public static final class Builder { + private String streamName; + private BigQueryWriteClient client; + private TableSchema tableSchema; + + private ToProtoConverter toProtoConverter; + private TransportChannelProvider channelProvider; + private CredentialsProvider credentialsProvider; + private ExecutorProvider executorProvider; + private FlowControlSettings flowControlSettings; + private String endpoint; + private boolean createDefaultStream = false; + private String traceId; + private boolean ignoreUnknownFields = false; + private boolean reconnectAfter10M = false; + // Indicte whether multiplexing mode is enabled. + private boolean enableConnectionPool = false; + private String location; + + private static String streamPatternString = + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; + private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; + + private static Pattern streamPattern = Pattern.compile(streamPatternString); + private static Pattern tablePattern = Pattern.compile(tablePatternString); + + /** + * Constructor for SchemaAwareStreamWriter's Builder + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+" + * @param tableSchema schema used to convert items to proto messages. + * @param client + * @param toProtoConverter converter used to convert items to proto messages + */ + private Builder( + String streamOrTableName, + TableSchema tableSchema, + BigQueryWriteClient client, + ToProtoConverter toProtoConverter) { + Matcher streamMatcher = streamPattern.matcher(streamOrTableName); + if (!streamMatcher.matches()) { + Matcher tableMatcher = tablePattern.matcher(streamOrTableName); + if (!tableMatcher.matches()) { + throw new IllegalArgumentException("Invalid name: " + streamOrTableName); + } else { + this.streamName = streamOrTableName + "/_default"; + } + } else { + this.streamName = streamOrTableName; + } + this.client = client; + if (tableSchema == null) { + GetWriteStreamRequest writeStreamRequest = + GetWriteStreamRequest.newBuilder() + .setName(this.getStreamName()) + .setView(WriteStreamView.FULL) + .build(); + + WriteStream writeStream = this.client.getWriteStream(writeStreamRequest); + TableSchema writeStreamTableSchema = writeStream.getTableSchema(); + + this.tableSchema = writeStreamTableSchema; + this.location = writeStream.getLocation(); + } else { + this.tableSchema = tableSchema; + } + this.toProtoConverter = toProtoConverter; + } + + /** + * Setter for the underlying StreamWriter's TransportChannelProvider. + * + * @param channelProvider + * @return Builder + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = + Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's CredentialsProvider. + * + * @param credentialsProvider + * @return Builder + */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = + Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's ExecutorProvider. + * + * @param executorProvider + * @return + */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's FlowControlSettings. + * + * @param flowControlSettings + * @return Builder + */ + public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { + this.flowControlSettings = + Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null."); + return this; + } + + /** + * Stream name on the builder. + * + * @return Builder + */ + public String getStreamName() { + return streamName; + } + + /** + * Setter for the underlying StreamWriter's Endpoint. + * + * @param endpoint + * @return Builder + */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + return this; + } + + /** + * Setter for a traceId to help identify traffic origin. + * + * @param traceId + * @return Builder + */ + public Builder setTraceId(String traceId) { + this.traceId = Preconditions.checkNotNull(traceId, "TraceId is null."); + return this; + } + + /** + * Setter for a ignoreUnkownFields, if true, unknown fields to BigQuery will be ignored + * instead of error out. + * + * @param ignoreUnknownFields + * @return Builder + */ + public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { + this.ignoreUnknownFields = ignoreUnknownFields; + return this; + } + + /** + * @Deprecated Setter for a reconnectAfter10M, temporaily workaround for omg/48020. Fix for the + * omg is supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be + * slower (0.75MB/s per connection), but your writes will not be stuck as a sympton of + * omg/48020. + * + * @param reconnectAfter10M + * @return Builder + */ + public Builder setReconnectAfter10M(boolean reconnectAfter10M) { + this.reconnectAfter10M = false; + return this; + } + + /** + * Enable multiplexing for this writer. In multiplexing mode tables will share the same + * connection if possible until the connection is overwhelmed. This feature is still under + * development, please contact write api team before using. + * + * @param enableConnectionPool + * @return Builder + */ + public Builder setEnableConnectionPool(boolean enableConnectionPool) { + this.enableConnectionPool = enableConnectionPool; + return this; + } + + /** + * Location of the table this stream writer is targeting. Connection pools are shared by + * location. + * + * @param location + * @return Builder + */ + public Builder setLocation(String location) { + if (this.location != null && !this.location.equals(location)) { + throw new IllegalArgumentException( + "Specified location " + location + " does not match the system value " + this.location); + } + this.location = location; + return this; + } + + /** + * Builds SchemaAwareStreamWriter + * + * @return SchemaAwareStreamWriter + */ + public SchemaAwareStreamWriter build() + throws DescriptorValidationException, + IllegalArgumentException, + IOException, + InterruptedException { + return new SchemaAwareStreamWriter<>(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java new file mode 100644 index 0000000000..254ce2e6dc --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java @@ -0,0 +1,13 @@ +package com.google.cloud.bigquery.storage.v1; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.json.JSONObject; + +public interface ToProtoConverter { + DynamicMessage convertToProtoMessage( + Descriptors.Descriptor protoSchema, + TableSchema tableSchema, + T json, + boolean ignoreUnknownFields); +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java index 91785ce0ec..32d57af488 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java @@ -541,7 +541,7 @@ public void testDifferentNameCasing() throws Exception { json.put("inT", 1); json.put("lONg", 1L); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -554,7 +554,7 @@ public void testBool() throws Exception { json.put("uppercase", "TRUE"); json.put("lowercase", "false"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBool.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBool.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -569,7 +569,7 @@ public void testInt64() throws Exception { json.put("long", 1L); json.put("string", "1"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -583,7 +583,7 @@ public void testInt32() throws Exception { json.put("int", 1); json.put("string", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -595,7 +595,7 @@ public void testInt32NotMatchInt64() throws Exception { json.put("int", 1L); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals("JSONObject does not have a int32 field at root.int.", e.getMessage()); @@ -615,7 +615,7 @@ public void testDateTimeMismatch() throws Exception { json.put("datetime", 1.0); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestDatetime.getDescriptor(), tableSchema, json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { @@ -636,7 +636,7 @@ public void testTimeMismatch() throws Exception { json.put("time", new JSONArray(new Double[] {1.0})); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestTime.getDescriptor(), tableSchema, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestTime.getDescriptor(), tableSchema, json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals("JSONObject does not have a int64 field at root.time[0].", e.getMessage()); @@ -657,7 +657,7 @@ public void testMixedCaseFieldNames() throws Exception { json.put("fooBar", "hello"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestMixedCaseFieldNames.getDescriptor(), tableSchema, json); } @@ -682,7 +682,7 @@ public void testDouble() throws Exception { json.put("long", 8L); json.put("string", "9.1"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestDouble.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestDouble.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -705,7 +705,7 @@ public void testDoubleHighPrecision() throws Exception { JSONObject json = new JSONObject(); json.put("numeric", 3.400500512978076); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestNumeric.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -735,7 +735,7 @@ public void testDoubleHighPrecision_RepeatedField() throws Exception { JSONObject json = new JSONObject(); json.put("bignumeric", ImmutableList.of(3.400500512978076, 0.10000000000055, 0.12)); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestBignumeric.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -775,7 +775,7 @@ public void testTimestamp() throws Exception { json.put("test_timezone", "2022-04-05 09:06:11 PST"); json.put("test_saformat", "2018/08/19 12:11"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestTimestamp.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -792,7 +792,7 @@ public void testDate() throws Exception { json.put("test_string", "2021-11-04"); json.put("test_long", 18935L); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestDate.getDescriptor(), tableSchema, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestDate.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -804,7 +804,7 @@ public void testAllTypes() throws Exception { try { LOG.info("Testing " + json + " over " + entry.getKey().getFullName()); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json); LOG.info("Convert Success!"); assertEquals(protoMsg, AllTypesToCorrectProto.get(entry.getKey())[success]); success += 1; @@ -833,7 +833,7 @@ public void testAllRepeatedTypesWithLimits() throws Exception { try { LOG.info("Testing " + json + " over " + entry.getKey().getFullName()); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json); LOG.info("Convert Success!"); assertEquals( protoMsg.toString(), @@ -869,7 +869,7 @@ public void testOptional() throws Exception { json.put("byte", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -881,7 +881,7 @@ public void testRepeatedIsOptional() throws Exception { json.put("required_double", 1.1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestRepeatedIsOptional.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestRepeatedIsOptional.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -891,7 +891,7 @@ public void testRequired() throws Exception { json.put("optional_double", 1.1); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestRequired.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestRequired.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -911,7 +911,7 @@ public void testStructSimple() throws Exception { json.put("test_field_type", stringType); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -923,7 +923,7 @@ public void testStructSimpleFail() throws Exception { json.put("test_field_type", stringType); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1057,7 +1057,7 @@ public void testStructComplex() throws Exception { json.put("test_interval", "0-0 0 0:0:0.000005"); json.put("test_json", new JSONArray(new String[] {"{'a':'b'}"})); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( ComplexRoot.getDescriptor(), COMPLEX_TABLE_SCHEMA, json); assertEquals(expectedProto, protoMsg); } @@ -1083,7 +1083,7 @@ public void testStructComplexFail() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexRoot.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexRoot.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1097,7 +1097,7 @@ public void testRepeatedWithMixedTypes() throws Exception { json.put("test_repeated", new JSONArray("[1.1, 2.2, true]")); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedDouble.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedDouble.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1140,7 +1140,7 @@ public void testNestedRepeatedComplex() throws Exception { json.put("repeated_string", jsonRepeatedString); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json); assertEquals(protoMsg, expectedProto); } @@ -1159,7 +1159,7 @@ public void testNestedRepeatedComplexFail() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1181,7 +1181,7 @@ public void testEmptySecondLevelObject() throws Exception { json.put("complex_lvl2", complexLvl2); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1193,10 +1193,10 @@ public void testAllowUnknownFieldsError() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json); Assert.fail("Should fail"); - } catch (Exceptions.JsonDataHasUnknownFieldException e) { - assertEquals("JSONObject has fields unknown to BigQuery: root.string.", e.getMessage()); + } catch (Exceptions.DataHasUnknownFieldException e) { + assertEquals("Source object has fields unknown to BigQuery: root.string.", e.getMessage()); assertEquals("root.string", e.getFieldName()); } } @@ -1207,7 +1207,7 @@ public void testEmptyProtoMessage() throws Exception { json.put("test_repeated", new JSONArray(new int[0])); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json); assertEquals(protoMsg.getAllFields().size(), 0); } @@ -1216,7 +1216,7 @@ public void testEmptyJSONObject() throws Exception { JSONObject json = new JSONObject(); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), json); Assert.fail("Should fail"); } catch (IllegalStateException e) { assertEquals("JSONObject is empty.", e.getMessage()); @@ -1227,7 +1227,7 @@ public void testEmptyJSONObject() throws Exception { public void testNullJson() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), null); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), null); Assert.fail("Should fail"); } catch (NullPointerException e) { assertEquals("JSONObject is null.", e.getMessage()); @@ -1238,7 +1238,7 @@ public void testNullJson() throws Exception { public void testNullDescriptor() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(null, new JSONObject()); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(null, new JSONObject()); Assert.fail("Should fail"); } catch (NullPointerException e) { assertEquals("Protobuf descriptor is null.", e.getMessage()); @@ -1255,11 +1255,11 @@ public void testAllowUnknownFieldsSecondLevel() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); Assert.fail("Should fail"); } catch (IllegalArgumentException e) { assertEquals( - "JSONObject has fields unknown to BigQuery: root.complex_lvl2.no_match.", e.getMessage()); + "Source object has fields unknown to BigQuery: root.complex_lvl2.no_match.", e.getMessage()); } } @@ -1276,7 +1276,7 @@ public void testTopLevelMatchSecondLevelMismatch() throws Exception { json.put("complex_lvl2", complex_lvl2); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1287,7 +1287,7 @@ public void testJsonNullValue() throws Exception { json.put("long", JSONObject.NULL); json.put("int", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1298,7 +1298,7 @@ public void testJsonAllFieldsNullValue() throws Exception { json.put("long", JSONObject.NULL); json.put("int", JSONObject.NULL); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1319,7 +1319,7 @@ public void testBadJsonFieldRepeated() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedBytes.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedBytes.getDescriptor(), ts, json); Assert.fail("Should fail"); } catch (Exceptions.FieldParseError ex) { assertEquals(ex.getBqType(), "NUMERIC"); @@ -1344,7 +1344,7 @@ public void testBadJsonFieldIntRepeated() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); Assert.fail("Should fail"); } catch (IllegalArgumentException ex) { assertEquals(ex.getMessage(), "Text 'blah' could not be parsed at index 0"); @@ -1375,7 +1375,7 @@ public void testNullRepeatedField() throws Exception { json.put("test_repeated", JSONObject.NULL); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); assertTrue(protoMsg.getAllFields().isEmpty()); // Missing repeated field. @@ -1383,7 +1383,7 @@ public void testNullRepeatedField() throws Exception { json.put("test_non_repeated", JSONObject.NULL); protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); assertTrue(protoMsg.getAllFields().isEmpty()); } @@ -1406,10 +1406,10 @@ public void testDoubleAndFloatToNumericConversion() { JSONObject json = new JSONObject(); json.put("numeric", new Double(24.678)); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); json.put("numeric", new Float(24.678)); - protoMsg = JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json); + protoMsg = JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } @@ -1434,7 +1434,7 @@ public void testBigDecimalToBigNumericConversion() { JSONObject json = new JSONObject(); json.put("bignumeric", Collections.singletonList(new BigDecimal("24.6789012345"))); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } @@ -1458,11 +1458,11 @@ public void testDoubleAndFloatToRepeatedBigNumericConversion() { JSONObject json = new JSONObject(); json.put("bignumeric", Collections.singletonList(new Double(24.678))); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); json.put("bignumeric", Collections.singletonList(new Float(24.678))); protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java index c340d22e9a..9827e72588 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java @@ -618,7 +618,7 @@ public void testMixedCasedFieldNames() throws Exception { json.put("fooBar", "hello"); DynamicMessage protoMsg = - com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessage( + com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestMixedCaseFieldNames.getDescriptor(), tableSchema, json); }