Skip to content

Commit

Permalink
fix: migrate json writer to use StreamWriterV2 (#1058)
Browse files Browse the repository at this point in the history
* .

* .

* fix: move JsonWriter to use StreamWriterV2.

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed May 11, 2021
1 parent 157a897 commit 586777f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 101 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteStub</className>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder createDefaultStream()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.json.JSONArray;
Expand All @@ -46,8 +45,8 @@ public class JsonStreamWriter implements AutoCloseable {

private BigQueryWriteClient client;
private String streamName;
private StreamWriter streamWriter;
private StreamWriter.Builder streamWriterBuilder;
private StreamWriterV2 streamWriter;
private StreamWriterV2.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;

Expand All @@ -64,18 +63,18 @@ private JsonStreamWriter(Builder builder)
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName, builder.client);
}
streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
builder.endpoint,
builder.flowControlSettings,
builder.createDefaultStream);
builder.flowControlSettings);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
this.streamName = builder.streamName;
}

/**
Expand Down Expand Up @@ -109,17 +108,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
synchronized (this) {
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(request.build());
this.streamWriter.append(rowsBuilder.build(), offset);
return appendResponseFuture;
}
}
Expand All @@ -134,7 +126,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
void refreshConnection()
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
synchronized (this) {
this.streamWriter.shutdown();
this.streamWriter.close();
this.streamWriter = streamWriterBuilder.build();
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
Expand Down Expand Up @@ -164,41 +156,28 @@ private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
Boolean createDefaultStream) {
@Nullable FlowControlSettings flowControlSettings) {
if (channelProvider != null) {
streamWriterBuilder.setChannelProvider(channelProvider);
}
if (credentialsProvider != null) {
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
BatchingSettings.Builder batchSettingBuilder =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(4 * 1024 * 1024L);
if (flowControlSettings != null) {
streamWriterBuilder.setBatchingSettings(
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
} else {
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
}
if (endpoint != null) {
streamWriterBuilder.setEndpoint(endpoint);
}
if (createDefaultStream) {
streamWriterBuilder.createDefaultStream();
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
streamWriterBuilder.setMaxInflightBytes(
flowControlSettings.getMaxOutstandingRequestBytes());
}
if (flowControlSettings.getMaxOutstandingElementCount() != null) {
streamWriterBuilder.setMaxInflightRequests(
flowControlSettings.getMaxOutstandingElementCount());
}
}
}

/**
* Setter for table schema. Used for schema updates.
*
* @param tableSchema
*/
void setTableSchema(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}

/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
Expand Down Expand Up @@ -259,7 +238,7 @@ public void close() {
}

public static final class Builder {
private String streamOrTableName;
private String streamName;
private BigQueryWriteClient client;
private TableSchema tableSchema;

Expand All @@ -269,6 +248,13 @@ public static final class Builder {
private String endpoint;
private boolean createDefaultStream = false;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";

private static Pattern streamPattern = Pattern.compile(streamPatternString);
private static Pattern tablePattern = Pattern.compile(tablePatternString);

/**
* Constructor for JsonStreamWriter's Builder
*
Expand All @@ -279,7 +265,17 @@ public static final class Builder {
* @param client
*/
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamOrTableName = streamOrTableName;
Matcher streamMatcher = streamPattern.matcher(streamOrTableName);
if (!streamMatcher.matches()) {
Matcher tableMatcher = tablePattern.matcher(streamOrTableName);
if (!tableMatcher.matches()) {
throw new IllegalArgumentException("Invalid name: " + streamOrTableName);
} else {
this.streamName = streamOrTableName + "/_default";
}
} else {
this.streamName = streamOrTableName;
}
this.tableSchema = tableSchema;
this.client = client;
}
Expand Down Expand Up @@ -322,13 +318,12 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
}

/**
* If it is writing to a default stream.
* Stream name on the builder.
*
* @return Builder
*/
public Builder createDefaultStream() {
this.createDefaultStream = true;
return this;
public String getStreamName() {
return streamName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ public void testCreateDefaultStream() throws Exception {
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, v2Schema)
.createDefaultStream()
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public void TestBigDecimalEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public void TestTimeEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put("test_str", "Start of the day");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;
import org.threeten.bp.LocalDateTime;

/** Integration tests for BigQuery Write API. */
Expand Down Expand Up @@ -180,54 +179,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType(
return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
}

@Test
public void testBatchWriteWithCommittedStream()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setRequestByteThreshold(1024 * 1024L) // 1 Mb
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(2))
.build())
.build()) {
LOG.info("Sending one message");
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response1 =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build());
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build());
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
Expand Down Expand Up @@ -389,7 +340,6 @@ public void testJsonStreamWriterWithDefaultStream()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) {
JsonStreamWriter.newBuilder(parentTable.toString(), schema).build()) {
// Append 10 JSON objects to the stream.
for (int i = 0; i < 10; i++) {
// Create a JSON object that is compatible with the table schema.
Expand Down

0 comments on commit 586777f

Please sign in to comment.