Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit f278775

Browse files
authored
feat!: remove deprecated append method in StreamWriterV2 (#924)
* feat!: remove the deprecated append method taking AppendRowsRequests in StreamWriterV2 * Rename private append to appendInternal * mark public method removal in clirr file
1 parent e8255e7 commit f278775

File tree

4 files changed

+47
-103
lines changed

4 files changed

+47
-103
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<differenceType>7002</differenceType>
2929
<method>void flushAll(long)</method>
3030
</difference>
31+
<difference>
32+
<className>com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2</className>
33+
<differenceType>7002</differenceType>
34+
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest)</method>
35+
</difference>
3136
<difference>
3237
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
3338
<differenceType>7002</differenceType>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ private StreamWriterV2(Builder builder) throws IOException {
143143
this.hasMessageInWaitingQueue = lock.newCondition();
144144
this.inflightReduced = lock.newCondition();
145145
this.streamName = builder.streamName;
146+
if (builder.writerSchema == null) {
147+
throw new StatusRuntimeException(
148+
Status.fromCode(Code.INVALID_ARGUMENT)
149+
.withDescription("Writer schema must be provided when building this writer."));
150+
}
146151
this.writerSchema = builder.writerSchema;
147152
this.maxInflightRequests = builder.maxInflightRequest;
148153
this.maxInflightBytes = builder.maxInflightBytes;
@@ -216,48 +221,15 @@ public void run() {
216221
* @return the append response wrapped in a future.
217222
*/
218223
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
219-
// TODO: Move this check to builder after the other append is removed.
220-
if (this.writerSchema == null) {
221-
throw new StatusRuntimeException(
222-
Status.fromCode(Code.INVALID_ARGUMENT)
223-
.withDescription("Writer schema must be provided when building this writer."));
224-
}
225224
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
226225
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
227226
if (offset >= 0) {
228227
requestBuilder.setOffset(Int64Value.of(offset));
229228
}
230-
return append(requestBuilder.build());
229+
return appendInternal(requestBuilder.build());
231230
}
232231

233-
/**
234-
* Schedules the writing of a message.
235-
*
236-
* <p>Example of writing a message.
237-
*
238-
* <pre>{@code
239-
* AppendRowsRequest message;
240-
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
241-
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
242-
* public void onSuccess(AppendRowsResponse response) {
243-
* if (!response.hasError()) {
244-
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
245-
* } else {
246-
* System.out.println("received an in stream error: " + response.getError().toString());
247-
* }
248-
* }
249-
*
250-
* public void onFailure(Throwable t) {
251-
* System.out.println("failed to write: " + t);
252-
* }
253-
* }, MoreExecutors.directExecutor());
254-
* }</pre>
255-
*
256-
* @param message the message in serialized format to write to BigQuery.
257-
* @return the append response wrapped in a future.
258-
*/
259-
@Deprecated
260-
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
232+
private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
261233
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
262234
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
263235
requestWrapper.appendResult.setException(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ public void tearDown() throws Exception {
8585
}
8686

8787
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
88-
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
88+
return StreamWriterV2.newBuilder(TEST_STREAM, client)
89+
.setWriterSchema(createProtoSchema())
90+
.setTraceId(TEST_TRACE_ID)
91+
.build();
8992
}
9093

9194
private ProtoSchema createProtoSchema() {
@@ -112,19 +115,6 @@ private ProtoRows createProtoRows(String[] messages) {
112115
return rowsBuilder.build();
113116
}
114117

115-
private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
116-
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
117-
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
118-
dataBuilder.setWriterSchema(createProtoSchema());
119-
if (offset > 0) {
120-
requestBuilder.setOffset(Int64Value.of(offset));
121-
}
122-
return requestBuilder
123-
.setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
124-
.setWriteStream(TEST_STREAM)
125-
.build();
126-
}
127-
128118
private AppendRowsResponse createAppendResponse(long offset) {
129119
return AppendRowsResponse.newBuilder()
130120
.setAppendResult(
@@ -139,7 +129,7 @@ private AppendRowsResponse createAppendResponseWithError(Status.Code code, Strin
139129
}
140130

141131
private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
142-
return writer.append(createAppendRequest(messages, -1));
132+
return writer.append(createProtoRows(messages), -1);
143133
}
144134

145135
private static <T extends Throwable> T assertFutureException(
@@ -201,6 +191,7 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
201191
StreamWriterV2.newBuilder(TEST_STREAM)
202192
.setCredentialsProvider(NoCredentialsProvider.create())
203193
.setChannelProvider(serviceHelper.createChannelProvider())
194+
.setWriterSchema(createProtoSchema())
204195
.build();
205196

206197
testBigQueryWrite.addResponse(createAppendResponse(0));
@@ -210,12 +201,8 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
210201
}
211202

212203
@Test
213-
public void testAppendWithRowsSuccess() throws Exception {
214-
StreamWriterV2 writer =
215-
StreamWriterV2.newBuilder(TEST_STREAM, client)
216-
.setWriterSchema(createProtoSchema())
217-
.setTraceId(TEST_TRACE_ID)
218-
.build();
204+
public void testAppendSuccess() throws Exception {
205+
StreamWriterV2 writer = getTestStreamWriterV2();
219206

220207
long appendCount = 100;
221208
for (int i = 0; i < appendCount; i++) {
@@ -237,38 +224,14 @@ public void testAppendWithRowsSuccess() throws Exception {
237224
}
238225

239226
@Test
240-
public void testAppendWithMessageSuccess() throws Exception {
241-
StreamWriterV2 writer = getTestStreamWriterV2();
242-
243-
long appendCount = 1000;
244-
for (int i = 0; i < appendCount; i++) {
245-
testBigQueryWrite.addResponse(createAppendResponse(i));
246-
}
247-
248-
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
249-
for (int i = 0; i < appendCount; i++) {
250-
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
251-
}
252-
253-
for (int i = 0; i < appendCount; i++) {
254-
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
255-
}
256-
257-
verifyAppendRequests(appendCount);
258-
259-
writer.close();
260-
}
261-
262-
@Test
263-
public void testAppendWithRowsNoSchema() throws Exception {
264-
final StreamWriterV2 writer = getTestStreamWriterV2();
227+
public void testNoSchema() throws Exception {
265228
StatusRuntimeException ex =
266229
assertThrows(
267230
StatusRuntimeException.class,
268231
new ThrowingRunnable() {
269232
@Override
270233
public void run() throws Throwable {
271-
writer.append(createProtoRows(new String[] {"A"}), -1);
234+
StreamWriterV2.newBuilder(TEST_STREAM, client).build();
272235
}
273236
});
274237
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
@@ -455,7 +418,10 @@ public void serverCloseWhileRequestsInflight() throws Exception {
455418
@Test
456419
public void testZeroMaxInflightRequests() throws Exception {
457420
StreamWriterV2 writer =
458-
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(0).build();
421+
StreamWriterV2.newBuilder(TEST_STREAM, client)
422+
.setWriterSchema(createProtoSchema())
423+
.setMaxInflightRequests(0)
424+
.build();
459425
testBigQueryWrite.addResponse(createAppendResponse(0));
460426
verifyAppendIsBlocked(writer);
461427
writer.close();
@@ -464,7 +430,10 @@ public void testZeroMaxInflightRequests() throws Exception {
464430
@Test
465431
public void testZeroMaxInflightBytes() throws Exception {
466432
StreamWriterV2 writer =
467-
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(0).build();
433+
StreamWriterV2.newBuilder(TEST_STREAM, client)
434+
.setWriterSchema(createProtoSchema())
435+
.setMaxInflightBytes(0)
436+
.build();
468437
testBigQueryWrite.addResponse(createAppendResponse(0));
469438
verifyAppendIsBlocked(writer);
470439
writer.close();
@@ -473,7 +442,10 @@ public void testZeroMaxInflightBytes() throws Exception {
473442
@Test
474443
public void testOneMaxInflightRequests() throws Exception {
475444
StreamWriterV2 writer =
476-
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(1).build();
445+
StreamWriterV2.newBuilder(TEST_STREAM, client)
446+
.setWriterSchema(createProtoSchema())
447+
.setMaxInflightRequests(1)
448+
.build();
477449
// Server will sleep 1 second before every response.
478450
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
479451
testBigQueryWrite.addResponse(createAppendResponse(0));
@@ -489,7 +461,10 @@ public void testOneMaxInflightRequests() throws Exception {
489461
@Test
490462
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
491463
StreamWriterV2 writer =
492-
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(1).build();
464+
StreamWriterV2.newBuilder(TEST_STREAM, client)
465+
.setWriterSchema(createProtoSchema())
466+
.setMaxInflightBytes(1)
467+
.build();
493468
// Server will sleep 100ms before every response.
494469
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
495470
long appendCount = 10;
@@ -500,7 +475,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
500475
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
501476
long appendStartTimeMs = System.currentTimeMillis();
502477
for (int i = 0; i < appendCount; i++) {
503-
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
478+
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
504479
}
505480
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
506481
assertTrue(appendElapsedMs >= 1000);

samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.api.core.ApiFuture;
2121
import com.google.api.core.ApiFutureCallback;
2222
import com.google.api.core.ApiFutures;
23-
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
2423
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
2524
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
2625
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
@@ -35,7 +34,6 @@
3534
import com.google.common.util.concurrent.MoreExecutors;
3635
import com.google.protobuf.Descriptors.Descriptor;
3736
import com.google.protobuf.Descriptors.DescriptorValidationException;
38-
import com.google.protobuf.Int64Value;
3937
import com.google.protobuf.Message;
4038
import java.io.IOException;
4139
import java.time.Duration;
@@ -163,16 +161,19 @@ private void writeToStream(
163161
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
164162
writeStream.getTableSchema());
165163
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
166-
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
164+
try (StreamWriterV2 writer =
165+
StreamWriterV2.newBuilder(writeStream.getName())
166+
.setWriterSchema(protoSchema)
167+
.setTraceId("SAMPLE:parallel_append")
168+
.build()) {
167169
while (System.currentTimeMillis() < deadlineMillis) {
168170
synchronized (this) {
169171
if (error != null) {
170172
// Stop writing once we get an error.
171173
throw error;
172174
}
173175
}
174-
ApiFuture<AppendRowsResponse> future =
175-
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
176+
ApiFuture<AppendRowsResponse> future = writer.append(createAppendRows(descriptor), -1);
176177
synchronized (this) {
177178
inflightCount++;
178179
}
@@ -197,8 +198,7 @@ private void waitForInflightToReachZero(Duration timeout) {
197198
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
198199
}
199200

200-
private AppendRowsRequest createAppendRequest(
201-
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
201+
private ProtoRows createAppendRows(Descriptor descriptor) {
202202
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
203203
for (int i = 0; i < BATCH_SIZE; i++) {
204204
byte[] payload = new byte[ROW_SIZE];
@@ -208,15 +208,7 @@ private AppendRowsRequest createAppendRequest(
208208
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
209209
rowsBuilder.addSerializedRows(protoMessage.toByteString());
210210
}
211-
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
212-
data.setWriterSchema(protoSchema);
213-
data.setRows(rowsBuilder.build());
214-
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
215-
request.setWriteStream(streamName);
216-
if (offset >= 0) {
217-
request.setOffset(Int64Value.of(offset));
218-
}
219-
return request.build();
211+
return rowsBuilder.build();
220212
}
221213

222214
private void sleepIgnoringInterruption(Duration duration) {

0 commit comments

Comments
 (0)