Skip to content

Commit

Permalink
feat: better default stream support in client library (#750)
Browse files Browse the repository at this point in the history
* .

* .

* .

* feat: Support default stream on streamWriter and jsonStreamWriter

* add integration test

* .

* .

* ci(java): ignore bot users for generate-files-bot (#749)

Depends on googleapis/repo-automation-bots#1254

Fixes googleapis/repo-automation-bots#1096

Source-Author: Jeff Ching <chingor@google.com>
Source-Date: Tue Dec 15 16:16:07 2020 -0800
Source-Repo: googleapis/synthtool
Source-Sha: 3f67ceece7e797a5736a25488aae35405649b90b
Source-Link: googleapis/synthtool@3f67cee

* .

* .

* chore: synthtool changes (#746)

* changes without context

        autosynth cannot find the source of changes triggered by earlier changes in this
        repository, or by version upgrades to tools such as linters.

* chore: migrate java-bigquerystorage to the Java microgenerator

Committer: @miraleung
PiperOrigin-RevId: 345311069

Source-Author: Google APIs <noreply@google.com>
Source-Date: Wed Dec 2 14:17:15 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: e39e42f368d236203a774ee994fcb4d730c33a83
Source-Link: googleapis/googleapis@e39e42f

* feat!: Updates to BigQuery Write API V1Beta2 public interface. This includes breaking changes to the API, it is fine because the API is not officially launched yet.

PiperOrigin-RevId: 345469340

Source-Author: Google APIs <noreply@google.com>
Source-Date: Thu Dec 3 09:33:11 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: b53c4d98aab1eae3dac90b37019dede686782f13
Source-Link: googleapis/googleapis@b53c4d9

* fix: Update gapic-generator-java to 0.0.7

Committer: @miraleung
PiperOrigin-RevId: 345476969

Source-Author: Google APIs <noreply@google.com>
Source-Date: Thu Dec 3 10:07:32 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: 7be2c821dd88109038c55c89f7dd48f092eeab9d
Source-Link: googleapis/googleapis@7be2c82

* chore: rollback migrating java-bigquerystorage to the Java microgenerator

Committer: @miraleung
PiperOrigin-RevId: 345522380

Source-Author: Google APIs <noreply@google.com>
Source-Date: Thu Dec 3 13:28:07 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: f8f975c7d43904e90d6c5f1684fdb6804400e641
Source-Link: googleapis/googleapis@f8f975c

* chore: migrate java-bigquerystorage to the Java microgenerator

Committer: @miraleung
PiperOrigin-RevId: 346405446

Source-Author: Google APIs <noreply@google.com>
Source-Date: Tue Dec 8 14:03:11 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: abc43060f136ce77124754a48f367102e646844a
Source-Link: googleapis/googleapis@abc4306

* chore: update gapic-generator-java to 0.0.11

Committer: @miraleung
PiperOrigin-RevId: 347036369

Source-Author: Google APIs <noreply@google.com>
Source-Date: Fri Dec 11 11:13:47 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: 6d65640b1fcbdf26ea76cb720de0ac138cae9bed
Source-Link: googleapis/googleapis@6d65640

* chore: update gapic-generator-java to 0.0.13

Committer: @miraleung
PiperOrigin-RevId: 347849179

Source-Author: Google APIs <noreply@google.com>
Source-Date: Wed Dec 16 10:28:38 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: 360a0e177316b7e9811f2ccbbef11e5f83377f3f
Source-Link: googleapis/googleapis@360a0e1

* .

* .

* .

* .

* .

* .

* .

* fix flushall test

Co-authored-by: Yoshi Automation Bot <yoshi-automation@google.com>
Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>
  • Loading branch information
3 people committed Dec 30, 2020
1 parent 0988105 commit 488f258
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 66 deletions.
4 changes: 0 additions & 4 deletions .github/generated-files-bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,3 @@ externalManifests:
- type: json
file: '.github/readme/synth.metadata/synth.metadata'
jsonpath: '$.generatedFiles[*]'
ignoreAuthors:
- 'renovate-bot'
- 'yoshi-automation'
- 'release-please[bot]'
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.collect.ImmutableMap;

/** Converts structure from BigQuery v2 API to BigQueryStorage API */
public class BQV2ToBQStorageConverter {
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
.build();

/**
* Converts from bigquery v2 Table Schema to bigquery storage API Table Schema.
*
* @param schame the bigquery v2 Table Schema
* @return the bigquery storage API Table Schema
*/
public static TableSchema ConvertTableSchema(Schema schema) {
TableSchema.Builder result = TableSchema.newBuilder();
for (int i = 0; i < schema.getFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(schema.getFields().get(i)));
}
return result.build();
}

/**
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
*
* @param schame the bigquery v2 Field Schema
* @return the bigquery storage API Field Schema
*/
public static TableFieldSchema ConvertFieldSchema(Field field) {
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
if (field.getMode() == null) {
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
}
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
result.setName(field.getName());
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
if (field.getDescription() != null) {
result.setDescription(field.getDescription());
}
if (field.getSubFields() != null) {
for (int i = 0; i < field.getSubFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(field.getSubFields().get(i)));
}
}
return result.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.json.JSONArray;
Expand Down Expand Up @@ -62,21 +62,15 @@ public class JsonStreamWriter implements AutoCloseable {
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
Matcher matcher = streamPattern.matcher(builder.streamName);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid stream name: " + builder.streamName);
}

this.streamName = builder.streamName;
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

StreamWriter.Builder streamWriterBuilder;
if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client);
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
}
setStreamWriterSettings(
streamWriterBuilder,
Expand All @@ -85,9 +79,12 @@ private JsonStreamWriter(Builder builder)
builder.batchingSettings,
builder.retrySettings,
builder.executorProvider,
builder.endpoint);
builder.endpoint,
builder.createDefaultStream);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
Expand Down Expand Up @@ -126,12 +123,12 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
synchronized (this) {
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(
AppendRowsRequest.newBuilder()
.setProtoRows(data.build())
.setOffset(Int64Value.of(offset))
.build());
this.streamWriter.append(request.build());
return appendResponseFuture;
}
}
Expand Down Expand Up @@ -179,7 +176,8 @@ private void setStreamWriterSettings(
@Nullable BatchingSettings batchingSettings,
@Nullable RetrySettings retrySettings,
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint) {
@Nullable String endpoint,
Boolean createDefaultStream) {
if (channelProvider != null) {
builder.setChannelProvider(channelProvider);
}
Expand All @@ -198,6 +196,9 @@ private void setStreamWriterSettings(
if (endpoint != null) {
builder.setEndpoint(endpoint);
}
if (createDefaultStream) {
builder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
Expand All @@ -217,34 +218,53 @@ void setTableSchema(TableSchema tableSchema) {
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
*
* @param streamName name of the stream that must follow
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or if it is default stream
* (createDefaultStream is true on builder), then the name here should be a table name
* ""projects/[^/]+/datasets/[^/]+/tables/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @return Builder
*/
public static Builder newBuilder(String streamOrTableName, TableSchema tableSchema) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
return new Builder(streamOrTableName, tableSchema, null);
}

/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
*
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @return Builder
*/
public static Builder newBuilder(String streamName, TableSchema tableSchema) {
Preconditions.checkNotNull(streamName, "StreamName is null.");
public static Builder newBuilder(String streamOrTableName, Schema tableSchema) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
return new Builder(streamName, tableSchema, null);
return new Builder(
streamOrTableName, BQV2ToBQStorageConverter.ConvertTableSchema(tableSchema), null);
}

/**
* newBuilder that constructs a JsonStreamWriter builder.
*
* @param streamName name of the stream that must follow
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @param client
* @return Builder
*/
public static Builder newBuilder(
String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamName, "StreamName is null.");
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamName, tableSchema, client);
return new Builder(streamOrTableName, tableSchema, client);
}

/** Closes the underlying StreamWriter. */
Expand Down Expand Up @@ -287,7 +307,7 @@ public void run() {
}

public static final class Builder {
private String streamName;
private String streamOrTableName;
private BigQueryWriteClient client;
private TableSchema tableSchema;

Expand All @@ -297,17 +317,19 @@ public static final class Builder {
private RetrySettings retrySettings;
private ExecutorProvider executorProvider;
private String endpoint;
private boolean createDefaultStream = false;

/**
* Constructor for JsonStreamWriter's Builder
*
* @param streamName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/_default"
* @param tableSchema schema used to convert Json to proto messages.
* @param client
*/
private Builder(String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamName = streamName;
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamOrTableName = streamOrTableName;
this.tableSchema = tableSchema;
this.client = client;
}
Expand Down Expand Up @@ -371,6 +393,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}

/**
* If it is writing to a default stream.
*
* @return Builder
*/
public Builder createDefaultStream() {
this.createDefaultStream = true;
return this;
}

/**
* Setter for the underlying StreamWriter's Endpoint.
*
Expand Down
Loading

0 comments on commit 488f258

Please sign in to comment.