Skip to content

Commit

Permalink
feat: expose configuration to config the default missing value
Browse files Browse the repository at this point in the history
interpretation
  • Loading branch information
GaoleMeng committed Aug 16, 2023
1 parent 7a95278 commit e27ba5f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 0 deletions.
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -388,6 +389,11 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setWriteStream(streamWriter.getStreamName());
requestBuilder.putAllMissingValueInterpretations(
streamWriter.getMissingValueInterpretationMap());
if (streamWriter.getDefaultValueInterpretation()
!= MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
requestBuilder.setDefaultMissingValueInterpretation(
streamWriter.getDefaultValueInterpretation());
}
return appendInternal(streamWriter, requestBuilder.build());
}

Expand Down
Expand Up @@ -346,6 +346,22 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*
* <p>If this value is set to `DEFAULT_VALUE`, we will always populate default value if the
* field is missing from json and default value is defined in the column.
*
* <p>If this value is set to `NULL_VALUE`, we will always not populate default value.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.schemaAwareStreamWriterBuilder.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -97,6 +98,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.compressorName);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -433,6 +436,9 @@ public static final class Builder<T> {
private String location;
private String compressorName;

private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
Expand Down Expand Up @@ -627,6 +633,16 @@ public Builder<T> setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
Expand Down Expand Up @@ -90,6 +91,13 @@ public class StreamWriter implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/**
* The default missing value interpretation if the column has default value defined but not
* presented in the missing value map.
*/
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
Expand Down Expand Up @@ -201,6 +209,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
if (!builder.enableConnectionPool) {
this.location = builder.location;
Expand Down Expand Up @@ -312,6 +321,10 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
return defaultMissingValueInterpretation;
}

static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
Expand Down Expand Up @@ -602,6 +615,10 @@ public static final class Builder {

private String compressorName = null;

// Default missing value interpretation value.
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -729,6 +746,16 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
Expand Down Expand Up @@ -855,6 +856,63 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
writer2.close();
}

@Test
public void testDefaultValueInterpretation_multiplexingCase() throws Exception {
// Use the shared connection mode.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
ProtoSchema schema1 = createProtoSchema("Schema1");
ProtoSchema schema2 = createProtoSchema("Schema2");
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(schema1)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(schema2)
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setLocation("US")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4));
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3));
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i % 4 <= 1) {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
} else {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.NULL_VALUE);
}
}

writer1.close();
writer2.close();
}

@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriter writer =
Expand Down

0 comments on commit e27ba5f

Please sign in to comment.