Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose settings to configure default missing value interpretation. #2230

Merged
merged 6 commits into from Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -388,6 +389,11 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setWriteStream(streamWriter.getStreamName());
requestBuilder.putAllMissingValueInterpretations(
streamWriter.getMissingValueInterpretationMap());
if (streamWriter.getDefaultValueInterpretation()
!= MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
requestBuilder.setDefaultMissingValueInterpretation(
streamWriter.getDefaultValueInterpretation());
}
return appendInternal(streamWriter, requestBuilder.build());
}

Expand Down
Expand Up @@ -346,6 +346,22 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*
* <p>If this value is set to `DEFAULT_VALUE`, we will always populate default value if the
* field is missing from json and default value is defined in the column.
*
* <p>If this value is set to `NULL_VALUE`, we will always not populate default value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the comment of 'NULL_VALUE' to: we will always populate NULL to the missing fields, which is currently our default behavior.

*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.schemaAwareStreamWriterBuilder.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -97,6 +98,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.compressorName);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -433,6 +436,9 @@ public static final class Builder<T> {
private String location;
private String compressorName;

private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
Expand Down Expand Up @@ -627,6 +633,16 @@ public Builder<T> setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
Expand Down Expand Up @@ -90,6 +91,13 @@ public class StreamWriter implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/**
* The default missing value interpretation if the column has default value defined but not
* presented in the missing value map.
*/
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
Expand Down Expand Up @@ -201,6 +209,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
if (!builder.enableConnectionPool) {
this.location = builder.location;
Expand Down Expand Up @@ -312,6 +321,10 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
return defaultMissingValueInterpretation;
}

static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
Expand Down Expand Up @@ -602,6 +615,10 @@ public static final class Builder {

private String compressorName = null;

// Default missing value interpretation value.
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -729,6 +746,16 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.RepetitionType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
Expand All @@ -45,8 +46,10 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -64,6 +67,7 @@

@RunWith(JUnit4.class)
public class JsonStreamWriterTest {

private static final int NUMERIC_SCALE = 9;
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
Expand Down Expand Up @@ -514,6 +518,9 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getSerializedRows(i),
expectedProto.toByteString());
}
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getDefaultMissingValueInterpretation(),
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
}
}

Expand Down Expand Up @@ -1015,6 +1022,79 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception {
writer2.close();
}

@Test
public void testMissingValueInterpretation_multiplexingCase() throws Exception {
// Set min connection count to be 1 to force sharing connection.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
// The following two writers have different stream name and schema, but will share the same
// connection .
JsonStreamWriter writer1 =
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build();
JsonStreamWriter writer2 =
getTestJsonStreamWriterBuilder(TEST_STREAM_2)
.setEnableConnectionPool(true)
.setLocation("us")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of
// jsonStreamWriter1, jsonStreamWriter1, jsonStreamWriter2, jsonStreamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
ApiFuture<AppendRowsResponse> appendFuture1 = writer1.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer1.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer2.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture4 = writer2.append(jsonArr);
appendFuture1.get();
appendFuture2.get();
appendFuture3.get();
appendFuture4.get();
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
if (i % 4 <= 1) {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
} else {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.NULL_VALUE);
}
}

writer1.close();
writer2.close();
}

@Test
public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() throws Exception {
// Set min connection count to be 1 to force sharing connection.
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
Expand Down Expand Up @@ -849,6 +850,73 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
}
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
}

writer1.close();
writer2.close();
}

@Test
public void testDefaultValueInterpretation_multiplexingCase() throws Exception {
// Use the shared connection mode.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
ProtoSchema schema1 = createProtoSchema("Schema1");
ProtoSchema schema2 = createProtoSchema("Schema2");
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(schema1)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(schema2)
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setLocation("US")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
ApiFuture<AppendRowsResponse> appendFuture1 =
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4);
ApiFuture<AppendRowsResponse> appendFuture2 =
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1);
ApiFuture<AppendRowsResponse> appendFuture3 =
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2);
ApiFuture<AppendRowsResponse> appendFuture4 =
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3);
appendFuture1.get();
appendFuture2.get();
appendFuture3.get();
appendFuture4.get();
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i % 4 <= 1) {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
} else {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.NULL_VALUE);
}
}

writer1.close();
Expand Down