diff --git a/README.md b/README.md index 7c1c7f1ac0..1fab747ef2 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:24.2.0') +implementation platform('com.google.cloud:libraries-bom:24.3.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 31d1d2493f..b322ea3bca 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -53,6 +53,10 @@ public class JsonStreamWriter implements AutoCloseable { private Descriptor descriptor; private TableSchema tableSchema; private boolean ignoreUnknownFields = false; + private boolean reconnectAfter10M = false; + private long totalMessageSize = 0; + private long absTotal = 0; + private ProtoSchema protoSchema; /** * Constructs the JsonStreamWriter @@ -71,7 +75,9 @@ private JsonStreamWriter(Builder builder) } else { streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); } - streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); + streamWriterBuilder.setWriterSchema(protoSchema); setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, @@ -82,6 +88,7 @@ private JsonStreamWriter(Builder builder) this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; this.ignoreUnknownFields = builder.ignoreUnknownFields; + this.reconnectAfter10M = builder.reconnectAfter10M; } /** @@ -122,11 +129,10 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.tableSchema = updatedSchema; this.descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor - this.streamWriter = - streamWriterBuilder - .setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)) - .build(); + this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); } } @@ -134,15 +140,35 @@ public ApiFuture append(JSONArray jsonArr, long offset) // Any error in convertJsonToProtoMessage will throw an // IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing // of JSON data. + long currentRequestSize = 0; for (int i = 0; i < jsonArr.length(); i++) { JSONObject json = jsonArr.getJSONObject(i); Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage( this.descriptor, this.tableSchema, json, ignoreUnknownFields); rowsBuilder.addSerializedRows(protoMessage.toByteString()); + currentRequestSize += protoMessage.getSerializedSize(); } // Need to make sure refreshAppendAndSetDescriptor finish first before this can run synchronized (this) { + this.totalMessageSize += currentRequestSize; + this.absTotal += currentRequestSize; + // Reconnect on every 9.5MB. + if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) { + streamWriter.close(); + // Create a new underlying StreamWriter aka establish a new connection. + this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); + this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; + this.absTotal += currentRequestSize; + // Allow first request to pass. + } + LOG.fine( + "Sending a total of:" + + this.totalMessageSize + + " " + + currentRequestSize + + " " + + this.absTotal); final ApiFuture appendResponseFuture = this.streamWriter.append(rowsBuilder.build(), offset); return appendResponseFuture; @@ -264,6 +290,7 @@ public static final class Builder { private boolean createDefaultStream = false; private String traceId; private boolean ignoreUnknownFields = false; + private boolean reconnectAfter10M = false; private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -377,6 +404,19 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { return this; } + /** + * Setter for a reconnectAfter10M, temporaily workaround for omg/48020. Fix for the omg is + * supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be slower + * (0.75MB/s per connection), but your writes will not be stuck as a sympton of omg/48020. + * + * @param reconnectAfter10M + * @return Builder + */ + public Builder setReconnectAfter10M(boolean reconnectAfter10M) { + this.reconnectAfter10M = reconnectAfter10M; + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index b20da6cbb4..796e1acb29 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -412,6 +412,58 @@ public void testJsonStreamWriterWithDefaultStream() } } + // This test runs about 1 min. + @Test + public void testJsonStreamWriterWithMessagesOver10M() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "TableLarge"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + int totalRequest = 10; + int rowBatch = 40000; + ArrayList> allResponses = + new ArrayList>(totalRequest); + // Sends a total of 30MB over the wire. + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setReconnectAfter10M(true) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + totalRequest); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); + } + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + @Test public void testJsonStreamWriterSchemaUpdate() throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {