Skip to content

Commit

Permalink
fix: if new_partitions is size 0, do not enforce size check (#1673)
Browse files Browse the repository at this point in the history
Do not enforce new_partitions and change_stream_continuation_tokens to be the same size if new_partitions has size of 0 because Cloud Bigtable backend may not be updated to serve new_partitions field yet.

`new_partitions` is a new field and the backend may not be serving this field.

Change-Id: Id21c293b92c304f05b905ca8e8b3988b9241866e

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
tonytanger committed Mar 21, 2023
1 parent b1f669d commit 07bcfd9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Expand Up @@ -48,7 +48,8 @@ private static CloseStream create(
!changeStreamContinuationTokens.isEmpty(),
"A non-OK CloseStream should have continuation token(s).");
Preconditions.checkState(
changeStreamContinuationTokens.size() == newPartitions.size(),
newPartitions.size() == 0
|| changeStreamContinuationTokens.size() == newPartitions.size(),
"Number of continuation tokens does not match number of new partitions.");
}
return new AutoValue_CloseStream(
Expand Down
Expand Up @@ -236,9 +236,45 @@ public void closeStreamTokenAndNewPartitionCountMismatchedTest() {
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that number of continuation tokens and new partitions don't need to match if new
// partitions is empty.
@Test
public void closeStreamTokenAndZeroNewPartitionMismatchNoExceptionTest()
throws IOException, ClassNotFoundException {
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(closeStream);
oos.close();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
CloseStream actual = (CloseStream) ois.readObject();
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}
}

0 comments on commit 07bcfd9

Please sign in to comment.