Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit f617259

Browse files
authored
feat: Add flush API to StreamWriter (#278)
* feat: Add a flush method to client library. The implementation is void, so just added boilerplate unit test and e2e test. modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * fix a test failure due to table reuse, need to create a table in the test
1 parent 873d787 commit f617259

File tree

4 files changed

+82
-0
lines changed

4 files changed

+82
-0
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,29 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
243243
return outstandingAppend.appendResult;
244244
}
245245

246+
/**
247+
* Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
248+
* available for read. If no exception is thrown, it means the flush happened.
249+
*
250+
* <p>NOTE: Currently the implementation is void, BUFFERED steam acts like COMMITTED stream. It is
251+
* just for Dataflow team to mock the usage.
252+
*
253+
* @param offset Offset to which the rows will be committed to the system. It must fall within the
254+
* row counts on the stream.
255+
* @throws IllegalArgumentException if offset is invalid
256+
*/
257+
public void flush(long offset) {
258+
if (offset < 0) {
259+
throw new IllegalArgumentException("Invalid offset: " + offset);
260+
}
261+
// TODO: Once we persisted stream type, we should check the call can only be issued on BUFFERED
262+
// stream here.
263+
Storage.FlushRowsRequest request =
264+
Storage.FlushRowsRequest.newBuilder().setWriteStream(streamName).setOffset(offset).build();
265+
stub.flushRows(request);
266+
// TODO: We will verify if the returned offset is equal to requested offset.
267+
}
268+
246269
/**
247270
* Re-establishes a stream connection.
248271
*

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public void addResponse(AbstractMessage response) {
5454
serviceImpl.addResponse((AppendRowsResponse) response);
5555
} else if (response instanceof Stream.WriteStream) {
5656
serviceImpl.addWriteStreamResponse((Stream.WriteStream) response);
57+
} else if (response instanceof FlushRowsResponse) {
58+
serviceImpl.addFlushRowsResponse((FlushRowsResponse) response);
5759
} else {
5860
throw new IllegalStateException("Unsupported service");
5961
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
3838
private final LinkedBlockingQueue<AppendRowsRequest> requests = new LinkedBlockingQueue<>();
3939
private final LinkedBlockingQueue<GetWriteStreamRequest> writeRequests =
4040
new LinkedBlockingQueue<>();
41+
private final LinkedBlockingQueue<FlushRowsRequest> flushRequests = new LinkedBlockingQueue<>();
4142
private final LinkedBlockingQueue<Response> responses = new LinkedBlockingQueue<>();
4243
private final LinkedBlockingQueue<Stream.WriteStream> writeResponses =
4344
new LinkedBlockingQueue<>();
45+
private final LinkedBlockingQueue<FlushRowsResponse> flushResponses = new LinkedBlockingQueue<>();
4446
private final AtomicInteger nextMessageId = new AtomicInteger(1);
4547
private boolean autoPublishResponse;
4648
private ScheduledExecutorService executor = null;
@@ -97,6 +99,21 @@ public void getWriteStream(
9799
}
98100
}
99101

102+
@Override
103+
public void flushRows(
104+
FlushRowsRequest request, StreamObserver<FlushRowsResponse> responseObserver) {
105+
Object response = writeResponses.remove();
106+
if (response instanceof FlushRowsResponse) {
107+
flushRequests.add(request);
108+
responseObserver.onNext((FlushRowsResponse) response);
109+
responseObserver.onCompleted();
110+
} else if (response instanceof Exception) {
111+
responseObserver.onError((Exception) response);
112+
} else {
113+
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
114+
}
115+
}
116+
100117
@Override
101118
public StreamObserver<AppendRowsRequest> appendRows(
102119
final StreamObserver<AppendRowsResponse> responseObserver) {
@@ -173,6 +190,11 @@ public FakeBigQueryWriteImpl addWriteStreamResponse(Stream.WriteStream response)
173190
return this;
174191
}
175192

193+
public FakeBigQueryWriteImpl addFlushRowsResponse(FlushRowsResponse response) {
194+
flushResponses.add(response);
195+
return this;
196+
}
197+
176198
public FakeBigQueryWriteImpl addConnectionError(Throwable error) {
177199
responses.add(new Response(error));
178200
return this;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,4 +399,39 @@ public Long call() throws IOException, InterruptedException, ExecutionException
399399
}
400400
DirectWriter.clearCache();
401401
}
402+
403+
@Test
404+
public void testFlushRows() throws IOException, InterruptedException, ExecutionException {
405+
String tableName = "BufferTable";
406+
TableInfo tableInfo =
407+
TableInfo.newBuilder(
408+
TableId.of(DATASET, tableName),
409+
StandardTableDefinition.of(
410+
Schema.of(
411+
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
412+
.build())))
413+
.build();
414+
bigquery.create(tableInfo);
415+
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
416+
WriteStream writeStream =
417+
client.createWriteStream(
418+
CreateWriteStreamRequest.newBuilder()
419+
.setParent(parent.toString())
420+
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build())
421+
.build());
422+
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
423+
ApiFuture<AppendRowsResponse> response =
424+
streamWriter.append(
425+
createAppendRequest(writeStream.getName(), new String[] {"aaa"})
426+
.setOffset(Int64Value.of(0L))
427+
.build());
428+
assertEquals(0L, response.get().getOffset());
429+
streamWriter.flush(0);
430+
}
431+
TableResult result =
432+
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
433+
Iterator<FieldValueList> iter = result.getValues().iterator();
434+
assertEquals("aaa", iter.next().get(0).getStringValue());
435+
assertEquals(false, iter.hasNext());
436+
}
402437
}

0 commit comments

Comments
 (0)