Skip to content

Commit

Permalink
feat: add a Flush API to enable finer grained data commit needs for d…
Browse files Browse the repository at this point in the history
…ataflow. (#272)

fix: add resource definition for Table/ReadStream/WriteStream message
fix: add proper resource_reference for messages
chore: update copyright

committer: @xiaozhenliugg
PiperOrigin-RevId: 311188524

Source-Author: Google APIs <noreply@google.com>
Source-Date: Tue May 12 13:14:37 2020 -0700
Source-Repo: googleapis/googleapis
Source-Sha: bf17ae5fd93929beb44ac4c6b04f5088c3ee4a02
Source-Link: googleapis/googleapis@bf17ae5
  • Loading branch information
yoshi-automation committed May 13, 2020
1 parent 9c9471a commit b1c827f
Show file tree
Hide file tree
Showing 11 changed files with 1,911 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStub;
Expand Down Expand Up @@ -591,6 +593,108 @@ public final BatchCommitWriteStreamsResponse batchCommitWriteStreams(
return stub.batchCommitWriteStreamsCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream);
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(WriteStreamName writeStream) {
FlushRowsRequest request =
FlushRowsRequest.newBuilder()
.setWriteStream(writeStream == null ? null : writeStream.toString())
.build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream.toString());
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(String writeStream) {
FlushRowsRequest request = FlushRowsRequest.newBuilder().setWriteStream(writeStream).build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* FlushRowsResponse response = bigQueryWriteClient.flushRows(request);
* }
* </code></pre>
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(FlushRowsRequest request) {
return flushRowsCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* ApiFuture&lt;FlushRowsResponse&gt; future = bigQueryWriteClient.flushRowsCallable().futureCall(request);
* // Do something
* FlushRowsResponse response = future.get();
* }
* </code></pre>
*/
public final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return stub.flushRowsCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStubSettings;
Expand Down Expand Up @@ -102,6 +104,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return ((BigQueryWriteStubSettings) getStubSettings()).batchCommitWriteStreamsSettings();
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return ((BigQueryWriteStubSettings) getStubSettings()).flushRowsSettings();
}

public static final BigQueryWriteSettings create(BigQueryWriteStubSettings stub)
throws IOException {
return new BigQueryWriteSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -229,6 +236,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return getStubSettingsBuilder().batchCommitWriteStreamsSettings();
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return getStubSettingsBuilder().flushRowsSettings();
}

@Override
public BigQueryWriteSettings build() throws IOException {
return new BigQueryWriteSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import javax.annotation.Generated;
Expand Down Expand Up @@ -62,6 +64,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
throw new UnsupportedOperationException("Not implemented: batchCommitWriteStreamsCallable()");
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
throw new UnsupportedOperationException("Not implemented: flushRowsCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class BigQueryWriteStubSettings extends StubSettings<BigQueryWriteStubSet
finalizeWriteStreamSettings;
private final UnaryCallSettings<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

/** Returns the object with the settings used for calls to createWriteStream. */
public UnaryCallSettings<CreateWriteStreamRequest, WriteStream> createWriteStreamSettings() {
Expand Down Expand Up @@ -127,6 +130,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return batchCommitWriteStreamsSettings;
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public BigQueryWriteStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -201,6 +209,7 @@ protected BigQueryWriteStubSettings(Builder settingsBuilder) throws IOException
getWriteStreamSettings = settingsBuilder.getWriteStreamSettings().build();
finalizeWriteStreamSettings = settingsBuilder.finalizeWriteStreamSettings().build();
batchCommitWriteStreamsSettings = settingsBuilder.batchCommitWriteStreamsSettings().build();
flushRowsSettings = settingsBuilder.flushRowsSettings().build();
}

/** Builder for BigQueryWriteStubSettings. */
Expand All @@ -218,6 +227,7 @@ public static class Builder extends StubSettings.Builder<BigQueryWriteStubSettin
private final UnaryCallSettings.Builder<
BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;
Expand Down Expand Up @@ -270,12 +280,15 @@ protected Builder(ClientContext clientContext) {

batchCommitWriteStreamsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

flushRowsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);

initDefaults(this);
}
Expand Down Expand Up @@ -311,6 +324,11 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

builder
.flushRowsSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

return builder;
}

Expand All @@ -322,13 +340,15 @@ protected Builder(BigQueryWriteStubSettings settings) {
getWriteStreamSettings = settings.getWriteStreamSettings.toBuilder();
finalizeWriteStreamSettings = settings.finalizeWriteStreamSettings.toBuilder();
batchCommitWriteStreamsSettings = settings.batchCommitWriteStreamsSettings.toBuilder();
flushRowsSettings = settings.flushRowsSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);
}

// NEXT_MAJOR_VER: remove 'throws Exception'
Expand Down Expand Up @@ -377,6 +397,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return batchCommitWriteStreamsSettings;
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@Override
public BigQueryWriteStubSettings build() throws IOException {
return new BigQueryWriteStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -103,6 +105,14 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
.setResponseMarshaller(
ProtoUtils.marshaller(BatchCommitWriteStreamsResponse.getDefaultInstance()))
.build();
private static final MethodDescriptor<FlushRowsRequest, FlushRowsResponse>
flushRowsMethodDescriptor =
MethodDescriptor.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.bigquery.storage.v1alpha2.BigQueryWrite/FlushRows")
.setRequestMarshaller(ProtoUtils.marshaller(FlushRowsRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(FlushRowsResponse.getDefaultInstance()))
.build();

private final BackgroundResource backgroundResources;

Expand All @@ -113,6 +123,7 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
finalizeWriteStreamCallable;
private final UnaryCallable<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsCallable;
private final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable;

private final GrpcStubCallableFactory callableFactory;

Expand Down Expand Up @@ -212,6 +223,19 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
}
})
.build();
GrpcCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsTransportSettings =
GrpcCallSettings.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setMethodDescriptor(flushRowsMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<FlushRowsRequest>() {
@Override
public Map<String, String> extract(FlushRowsRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("write_stream", String.valueOf(request.getWriteStream()));
return params.build();
}
})
.build();

this.createWriteStreamCallable =
callableFactory.createUnaryCallable(
Expand All @@ -234,6 +258,9 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
batchCommitWriteStreamsTransportSettings,
settings.batchCommitWriteStreamsSettings(),
clientContext);
this.flushRowsCallable =
callableFactory.createUnaryCallable(
flushRowsTransportSettings, settings.flushRowsSettings(), clientContext);

backgroundResources = new BackgroundResourceAggregation(clientContext.getBackgroundResources());
}
Expand All @@ -260,6 +287,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
return batchCommitWriteStreamsCallable;
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return flushRowsCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Loading

0 comments on commit b1c827f

Please sign in to comment.