Skip to content

Commit

Permalink
feat: add schema update support in JsonStreamWriter (#1447)
Browse files Browse the repository at this point in the history
* feat: add schema update support in JsonStreamWriter

* fix unit tests

* fix lint

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* update concurrency logic and add corresponding testcase

* update getUpdatedSchema() in StreamWriter.java to use lock; update concurrent test case

* simplify logic

* add unit test

* update

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
stephaniewang526 and gcf-owl-bot[bot] committed Dec 29, 2021
1 parent b774f5d commit 973afcc
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
Expand All @@ -34,7 +35,10 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables.
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -81,27 +85,49 @@ private JsonStreamWriter(Builder builder)
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at current end
* of stream.
* of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter
* is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
throws IOException, DescriptorValidationException {
return append(jsonArr, -1);
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
* specified offset.
* specified offset. If there is a schema update, the current StreamWriter is closed. A new
* StreamWriter is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
.build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
Expand Down Expand Up @@ -155,9 +181,9 @@ private void setStreamWriterSettings(
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceId != null) {
streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId);
streamWriterBuilder.setTraceId("JsonWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("JsonWriterBeta:null");
streamWriterBuilder.setTraceId("JsonWriter:null");
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
*/
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand Down Expand Up @@ -135,6 +133,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
Expand Down Expand Up @@ -528,6 +532,9 @@ private void cleanupInflightRequests() {
private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
}
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
Expand Down Expand Up @@ -624,7 +631,12 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
/** Thread-safe getter of updated TableSchema */
public synchronized TableSchema getUpdatedSchema() {
return this.updatedSchema;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand All @@ -651,6 +663,8 @@ public static final class Builder {

private String traceId = null;

private TableSchema updatedTableSchema = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -26,6 +27,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -193,7 +195,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty");
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

Expand Down Expand Up @@ -284,8 +286,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null");
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down Expand Up @@ -388,4 +389,111 @@ public void testCreateDefaultStream() throws Exception {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
}
}

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

// Second append with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "aaa");
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(4, testBigQueryWrite.getAppendRequests().size());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRows(0),
UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());

assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertTrue(
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
}
}
}
Loading

0 comments on commit 973afcc

Please sign in to comment.