Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 8caf5a2

Browse files
authored
fix: enable tests that are disabled due to breaking change and stop ignoring ALREADY_EXISTED error (#748)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️
1 parent 7056a5f commit 8caf5a2

File tree

5 files changed

+325
-326
lines changed

5 files changed

+325
-326
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import com.google.api.gax.rpc.StreamController;
3737
import com.google.api.gax.rpc.TransportChannelProvider;
3838
import com.google.auth.oauth2.GoogleCredentials;
39-
import com.google.cloud.bigquery.storage.v1beta2.StorageProto.*;
4039
import com.google.common.base.Preconditions;
40+
import com.google.protobuf.Int64Value;
4141
import io.grpc.Status;
4242
import io.grpc.StatusRuntimeException;
4343
import java.io.IOException;
@@ -412,7 +412,7 @@ private static final class InflightBatch {
412412
this.inflightRequests = inflightRequests;
413413
this.offsetList = new ArrayList<Long>(inflightRequests.size());
414414
for (AppendRequestAndFutureResponse request : inflightRequests) {
415-
if (request.message.getOffset().getValue() > 0) {
415+
if (request.message.hasOffset()) {
416416
offsetList.add(new Long(request.message.getOffset().getValue()));
417417
} else {
418418
offsetList.add(new Long(-1));
@@ -485,17 +485,15 @@ private void onFailure(Throwable t) {
485485
private void onSuccess(AppendRowsResponse response) {
486486
for (int i = 0; i < inflightRequests.size(); i++) {
487487
AppendRowsResponse.Builder singleResponse = response.toBuilder();
488-
// if (offsetList.get(i) > 0) {
489-
// singleResponse.setOffset(offsetList.get(i));
490-
// } else {
491-
// long actualOffset = response.getOffset();
492-
// for (int j = 0; j < i; j++) {
493-
// actualOffset +=
494-
//
495-
// inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
496-
// }
497-
// singleResponse.setOffset(actualOffset);
498-
// }
488+
if (response.getAppendResult().hasOffset()) {
489+
long actualOffset = response.getAppendResult().getOffset().getValue();
490+
for (int j = 0; j < i; j++) {
491+
actualOffset +=
492+
inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
493+
}
494+
singleResponse.setAppendResult(
495+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset)));
496+
}
499497
inflightRequests.get(i).appendResult.set(singleResponse.build());
500498
}
501499
}
@@ -850,27 +848,28 @@ public void onResponse(AppendRowsResponse response) {
850848
}
851849
// Currently there is nothing retryable. If the error is already exists, then ignore it.
852850
if (response.hasError()) {
853-
if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) {
854-
StatusRuntimeException exception =
855-
new StatusRuntimeException(
856-
Status.fromCodeValue(response.getError().getCode())
857-
.withDescription(response.getError().getMessage()));
851+
StatusRuntimeException exception =
852+
new StatusRuntimeException(
853+
Status.fromCodeValue(response.getError().getCode())
854+
.withDescription(response.getError().getMessage()));
855+
inflightBatch.onFailure(exception);
856+
} else {
857+
if (inflightBatch.getExpectedOffset() > 0
858+
&& (response.getAppendResult().hasOffset()
859+
&& response.getAppendResult().getOffset().getValue()
860+
!= inflightBatch.getExpectedOffset())) {
861+
IllegalStateException exception =
862+
new IllegalStateException(
863+
String.format(
864+
"The append result offset %s does not match " + "the expected offset %s.",
865+
response.getAppendResult().getOffset().getValue(),
866+
inflightBatch.getExpectedOffset()));
858867
inflightBatch.onFailure(exception);
868+
abortInflightRequests(exception);
869+
} else {
870+
inflightBatch.onSuccess(response);
859871
}
860872
}
861-
// Temp for Breaking Change.
862-
// if (inflightBatch.getExpectedOffset() > 0
863-
// && response.getOffset() != inflightBatch.getExpectedOffset()) {
864-
// IllegalStateException exception =
865-
// new IllegalStateException(
866-
// String.format(
867-
// "The append result offset %s does not match " + "the expected offset %s.",
868-
// response.getOffset(), inflightBatch.getExpectedOffset()));
869-
// inflightBatch.onFailure(exception);
870-
// abortInflightRequests(exception);
871-
// } else {
872-
inflightBatch.onSuccess(response);
873-
// }
874873
} finally {
875874
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
876875
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ public void testComplicateSchemaWithPendingStream()
426426
.setParent(tableId2)
427427
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
428428
.build());
429+
FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance();
429430
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
430431
LOG.info("Sending two messages");
431432
ApiFuture<AppendRowsResponse> response =
@@ -449,24 +450,25 @@ public void testComplicateSchemaWithPendingStream()
449450
Iterator<FieldValueList> iter = result.getValues().iterator();
450451
assertEquals(false, iter.hasNext());
451452

452-
FinalizeWriteStreamResponse finalizeResponse =
453+
finalizeResponse =
453454
client.finalizeWriteStream(
454455
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
455456

456457
ApiFuture<AppendRowsResponse> response3 =
457458
streamWriter.append(
458459
createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"})
459-
.setOffset(Int64Value.of(1L))
460+
.setOffset(Int64Value.of(2L))
460461
.build());
461462
try {
462463
assertEquals(2, response3.get().getOffset());
463464
fail("Append to finalized stream should fail.");
464465
} catch (Exception expected) {
465466
// The exception thrown is not stable. Opened a bug to fix it.
467+
LOG.info("Got exception: " + expected.toString());
466468
}
467469
}
468470
// Finalize row count is not populated.
469-
// assertEquals(1, finalizeResponse.getRowCount());
471+
assertEquals(2, finalizeResponse.getRowCount());
470472
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
471473
client.batchCommitWriteStreams(
472474
BatchCommitWriteStreamsRequest.newBuilder()

0 commit comments

Comments
 (0)