From 9b23a29164f8820de0928017968f6146502f6943 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Tue, 28 Feb 2023 16:59:46 -0500 Subject: [PATCH 1/4] feat: Add getNewPartitions method to CloseStream for Bigtable ChangeStream --- .../clirr-ignored-differences.xml | 6 ++ .../bigtable/data/v2/models/CloseStream.java | 18 ++++- .../v2/models/ChangeStreamRecordTest.java | 11 +++ ...ChangeStreamRecordMergingCallableTest.java | 4 + ...ReadChangeStreamMergingAcceptanceTest.java | 9 +++ .../ReadChangeStreamRetryTest.java | 10 +++ .../src/test/resources/changestream.json | 80 ++++++++++++++++++- 7 files changed, 132 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index a0ffe39bd1..da5feada67 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -100,6 +100,12 @@ *getStatus* com.google.cloud.bigtable.common.Status + + + 7013 + com/google/cloud/bigtable/data/v2/models/CloseStream + *getNewPartitions* + 7006 diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index d5e121e664..0bfb90bafc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.cloud.bigtable.common.Status; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; @@ -35,8 +36,10 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable { private static CloseStream create( com.google.rpc.Status status, - List changeStreamContinuationTokens) { - return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens); + List changeStreamContinuationTokens, + List newPartitions) { + return new AutoValue_CloseStream( + Status.fromProto(status), changeStreamContinuationTokens, newPartitions); } /** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */ @@ -46,6 +49,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea closeStream.getStatus(), closeStream.getContinuationTokensList().stream() .map(ChangeStreamContinuationToken::fromProto) + .collect(ImmutableList.toImmutableList()), + closeStream.getNewPartitionsList().stream() + .map( + newPartition -> + ByteStringRange.create( + newPartition.getRowRange().getStartKeyClosed(), + newPartition.getRowRange().getEndKeyOpen())) .collect(ImmutableList.toImmutableList())); } @@ -56,4 +66,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea @InternalApi("Intended for use by the BigtableIO in apache/beam only.") @Nonnull public abstract List getChangeStreamContinuationTokens(); + + @InternalApi("Intended for use by the BigtableIO in apache/beam only.") + @Nonnull + public abstract List getNewPartitions(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 688ce46bcf..4e5aab300f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -85,6 +85,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce .setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build()) .setToken(token2) .build()) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2)) .setStatus(status) .build(); CloseStream closeStream = CloseStream.fromProto(closeStreamProto); @@ -98,6 +100,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce assertThat(actual.getChangeStreamContinuationTokens()) .isEqualTo(closeStream.getChangeStreamContinuationTokens()); assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus()); + assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions()); } @Test @@ -154,6 +157,8 @@ public void closeStreamTest() { .setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build()) .setToken(token2) .build()) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2)) .setStatus(status) .build(); CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto); @@ -169,5 +174,11 @@ public void closeStreamTest() { ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); assertThat(token2) .isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken()); + assertThat(actualCloseStream.getNewPartitions().get(0)) + .isEqualTo( + ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen())); + assertThat(actualCloseStream.getNewPartitions().get(1)) + .isEqualTo( + ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index 736491a0af..13ef2f7189 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -102,6 +102,7 @@ public void closeStreamTest() { ReadChangeStreamResponse.CloseStream closeStreamProto = ReadChangeStreamResponse.CloseStream.newBuilder() .addContinuationTokens(streamContinuationToken) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) .setStatus(Status.newBuilder().setCode(0).build()) .build(); ReadChangeStreamResponse response = @@ -127,5 +128,8 @@ public void closeStreamTest() { .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); assertThat(changeStreamContinuationToken.getToken()) .isEqualTo(streamContinuationToken.getToken()); + assertThat(closeStream.getNewPartitions().size()).isEqualTo(1); + assertThat(closeStream.getNewPartitions().get(0)) + .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java index 67d6a99f7b..7c3243ecfe 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java @@ -38,6 +38,7 @@ import com.google.cloud.bigtable.data.v2.models.DeleteFamily; import com.google.cloud.bigtable.data.v2.models.Entry; import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.SetCell; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile; @@ -173,6 +174,14 @@ public void test() throws Exception { .setToken(token.getToken()) .build()); } + for (ByteStringRange newPartition : closeStream.getNewPartitions()) { + builder.addNewPartitions( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(newPartition.getStart()) + .setEndKeyOpen(newPartition.getEnd()))); + } ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build(); actualResults.add( ReadChangeStreamTest.Result.newBuilder() diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index c994f3fc8d..2c9d441e55 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to .build(); } + private StreamPartition createNewPartitionForCloseStream() { + return StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED)) + .setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN))) + .build(); + } + private ReadChangeStreamResponse.Heartbeat createHeartbeat( StreamContinuationToken streamContinuationToken) { return ReadChangeStreamResponse.Heartbeat.newBuilder() @@ -133,6 +142,7 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat( private ReadChangeStreamResponse.CloseStream createCloseStream() { return ReadChangeStreamResponse.CloseStream.newBuilder() .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) + .addNewPartitions(createNewPartitionForCloseStream()) .setStatus(com.google.rpc.Status.newBuilder().setCode(0).build()) .build(); } diff --git a/google-cloud-bigtable/src/test/resources/changestream.json b/google-cloud-bigtable/src/test/resources/changestream.json index 9d9e2d46cc..661bf1b4cb 100644 --- a/google-cloud-bigtable/src/test/resources/changestream.json +++ b/google-cloud-bigtable/src/test/resources/changestream.json @@ -61,11 +61,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } } @@ -92,11 +106,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } }, @@ -137,6 +165,14 @@ }, "token": "close-stream-token-1" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + } ] } } @@ -176,6 +212,14 @@ }, "token": "close-stream-token-1" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + } ] } }, @@ -1280,11 +1324,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } } @@ -1363,11 +1421,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } }, From d1c04b7bc96b61e02c8442f8ac87735442f67216 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 1 Mar 2023 16:04:15 -0500 Subject: [PATCH 2/4] Address comments --- .../bigtable/data/v2/models/CloseStream.java | 13 ++++ .../v2/models/ChangeStreamRecordTest.java | 64 ++++++++++++++++++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index 0bfb90bafc..221b05f587 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -20,6 +20,7 @@ import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.cloud.bigtable.common.Status; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; @@ -38,6 +39,18 @@ private static CloseStream create( com.google.rpc.Status status, List changeStreamContinuationTokens, List newPartitions) { + if (status.getCode() == 0) { + Preconditions.checkState( + changeStreamContinuationTokens.isEmpty(), + "An OK CloseStream should not have continuation tokens."); + } else { + Preconditions.checkState( + !changeStreamContinuationTokens.isEmpty(), + "A non-OK CloseStream should have continuation token(s)."); + Preconditions.checkState( + changeStreamContinuationTokens.size() == newPartitions.size(), + "Number of continuation tokens does not match number of new partitions."); + } return new AutoValue_CloseStream( Status.fromProto(status), changeStreamContinuationTokens, newPartitions); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 4e5aab300f..c00221be3d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -30,7 +30,11 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.function.ThrowingRunnable; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Instant; @@ -38,6 +42,8 @@ @RunWith(JUnit4.class) public class ChangeStreamRecordTest { + @Rule public ExpectedException expect = ExpectedException.none(); + @Test public void heartbeatSerializationTest() throws IOException, ClassNotFoundException { ReadChangeStreamResponse.Heartbeat heartbeatProto = @@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept @Test public void closeStreamSerializationTest() throws IOException, ClassNotFoundException { - Status status = Status.newBuilder().setCode(0).build(); + Status status = Status.newBuilder().setCode(11).build(); RowRange rowRange1 = RowRange.newBuilder() .setStartKeyClosed(ByteString.copyFromUtf8("")) @@ -132,7 +138,7 @@ public void heartbeatTest() { @Test public void closeStreamTest() { - Status status = Status.newBuilder().setCode(0).build(); + Status status = Status.newBuilder().setCode(11).build(); RowRange rowRange1 = RowRange.newBuilder() .setStartKeyClosed(ByteString.copyFromUtf8("")) @@ -181,4 +187,58 @@ public void closeStreamTest() { .isEqualTo( ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); } + + // Tests that an OK CloseStream should not have continuation tokens. + @Test(expected = IllegalStateException.class) + public void closeStreamOkWithContinuationTokenShouldFail() { + Status status = Status.newBuilder().setCode(0).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); + } + + // Tests that a non-OK CloseStream should have continuation tokens. + @Test(expected = IllegalStateException.class) + public void closeStreamErrorWithoutContinuationTokenShouldFail() { + Status status = Status.newBuilder().setCode(11).build(); + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); + } + + // Tests that the number of continuation tokens should match the number of new partitions. + @Test(expected = IllegalStateException.class) + public void closeStreamTokenAndNewPartitionCountMismatchedTest() { + Status status = Status.newBuilder().setCode(11).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); + } } From 579cedec79ac477d591a70b91759ebe0314e85a9 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 1 Mar 2023 16:31:57 -0500 Subject: [PATCH 3/4] Address comments --- .../ReadChangeStreamResumptionStrategy.java | 3 ++- .../DefaultChangeStreamRecordAdapterTest.java | 2 -- ...ChangeStreamRecordMergingCallableTest.java | 2 +- .../ReadChangeStreamRetryTest.java | 22 ++++++++++++------- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 660466db95..d275aef105 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -56,7 +56,8 @@ public StreamResumptionStrategy cr public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. // We don't worry about resumption after CloseStream, since the server - // will return an OK status right after sending a CloseStream. + // will close the stream with an OK status right after sending a CloseStream, + // no matter what status the CloseStream.Status is. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index 99af76fb03..22270bc269 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -150,8 +150,6 @@ public void heartbeatTest() { public void closeStreamTest() { ReadChangeStreamResponse.CloseStream expectedCloseStream = ReadChangeStreamResponse.CloseStream.newBuilder() - .addContinuationTokens( - StreamContinuationToken.newBuilder().setToken("random-token").build()) .setStatus(Status.newBuilder().setCode(0).build()) .build(); assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index 13ef2f7189..f0939fb0cf 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -103,7 +103,7 @@ public void closeStreamTest() { ReadChangeStreamResponse.CloseStream.newBuilder() .addContinuationTokens(streamContinuationToken) .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) - .setStatus(Status.newBuilder().setCode(0).build()) + .setStatus(Status.newBuilder().setCode(11)) .build(); ReadChangeStreamResponse response = ReadChangeStreamResponse.newBuilder().setCloseStream(closeStreamProto).build(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index 2c9d441e55..48a62bfee8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -139,12 +139,18 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat( .build(); } - private ReadChangeStreamResponse.CloseStream createCloseStream() { - return ReadChangeStreamResponse.CloseStream.newBuilder() - .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) - .addNewPartitions(createNewPartitionForCloseStream()) - .setStatus(com.google.rpc.Status.newBuilder().setCode(0).build()) - .build(); + private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) { + ReadChangeStreamResponse.CloseStream.Builder builder = + ReadChangeStreamResponse.CloseStream.newBuilder(); + if (isOk) { + builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0)); + } else { + builder + .setStatus(com.google.rpc.Status.newBuilder().setCode(11)) + .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) + .addNewPartitions(createNewPartitionForCloseStream()); + } + return builder.build(); } private ReadChangeStreamResponse.DataChange createDataChange(boolean done) { @@ -188,7 +194,7 @@ public void happyPathHeartbeatTest() { @Test public void happyPathCloseStreamTest() { ReadChangeStreamResponse closeStreamResponse = - ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(true)).build(); service.expectations.add( RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse)); List actualResults = getResults(); @@ -231,7 +237,7 @@ public void singleHeartbeatImmediateRetryTest() { public void singleCloseStreamImmediateRetryTest() { // CloseStream. ReadChangeStreamResponse closeStreamResponse = - ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(false)).build(); service.expectations.add( RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE)); // Resume with the exact same request. From 0f928d99628d75fff47eb09b8f6bd374dad618f2 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 1 Mar 2023 16:42:07 -0500 Subject: [PATCH 4/4] Address comments --- .../changestream/ReadChangeStreamResumptionStrategy.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index d275aef105..fda608eda5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -57,7 +57,12 @@ public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. // We don't worry about resumption after CloseStream, since the server // will close the stream with an OK status right after sending a CloseStream, - // no matter what status the CloseStream.Status is. + // no matter what status the CloseStream.Status is: + // 1) ... => CloseStream.Ok => final OK. This means the read finishes successfully. + // 2) ... => CloseStream.Error => final OK. This means the client should start + // a new ReadChangeStream call with the continuation tokens specified in + // CloseStream. + // Either case, we don't need to retry after receiving a CloseStream. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) {