Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add getNewPartitions method to CloseStream for Bigtable ChangeStream #1655

Merged
merged 4 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- add new method is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getNewPartitions*</method>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
Expand All @@ -35,8 +37,22 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {

private static CloseStream create(
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
List<ByteStringRange> newPartitions) {
if (status.getCode() == 0) {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkState(
changeStreamContinuationTokens.isEmpty(),
"An OK CloseStream should not have continuation tokens.");
} else {
Preconditions.checkState(
!changeStreamContinuationTokens.isEmpty(),
"A non-OK CloseStream should have continuation token(s).");
Preconditions.checkState(
changeStreamContinuationTokens.size() == newPartitions.size(),
"Number of continuation tokens does not match number of new partitions.");
}
return new AutoValue_CloseStream(
Status.fromProto(status), changeStreamContinuationTokens, newPartitions);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand All @@ -46,6 +62,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
closeStream.getStatus(),
closeStream.getContinuationTokensList().stream()
.map(ChangeStreamContinuationToken::fromProto)
.collect(ImmutableList.toImmutableList()),
closeStream.getNewPartitionsList().stream()
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
.map(
newPartition ->
ByteStringRange.create(
newPartition.getRowRange().getStartKeyClosed(),
newPartition.getRowRange().getEndKeyOpen()))
.collect(ImmutableList.toImmutableList()));
}

Expand All @@ -56,4 +79,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ChangeStreamContinuationToken> getChangeStreamContinuationTokens();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ByteStringRange> getNewPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ public StreamResumptionStrategy<ReadChangeStreamRequest, ChangeStreamRecordT> cr
public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) {
// Update the token from a Heartbeat or a ChangeStreamMutation.
// We don't worry about resumption after CloseStream, since the server
// will return an OK status right after sending a CloseStream.
// will close the stream with an OK status right after sending a CloseStream,
// no matter what status the CloseStream.Status is:
// 1) ... => CloseStream.Ok => final OK. This means the read finishes successfully.
// 2) ... => CloseStream.Error => final OK. This means the client should start
// a new ReadChangeStream call with the continuation tokens specified in
// CloseStream.
// Either case, we don't need to retry after receiving a CloseStream.
if (changeStreamRecordAdapter.isHeartbeat(response)) {
this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response);
} else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
public class ChangeStreamRecordTest {

@Rule public ExpectedException expect = ExpectedException.none();

@Test
public void heartbeatSerializationTest() throws IOException, ClassNotFoundException {
ReadChangeStreamResponse.Heartbeat heartbeatProto =
Expand All @@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept

@Test
public void closeStreamSerializationTest() throws IOException, ClassNotFoundException {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -85,6 +91,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -98,6 +106,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}

@Test
Expand Down Expand Up @@ -129,7 +138,7 @@ public void heartbeatTest() {

@Test
public void closeStreamTest() {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -154,6 +163,8 @@ public void closeStreamTest() {
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -169,5 +180,65 @@ public void closeStreamTest() {
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
assertThat(token2)
.isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken());
assertThat(actualCloseStream.getNewPartitions().get(0))
.isEqualTo(
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
assertThat(actualCloseStream.getNewPartitions().get(1))
.isEqualTo(
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
}

// Tests that an OK CloseStream should not have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamOkWithContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(0).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that a non-OK CloseStream should have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamErrorWithoutContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(11).build();
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that the number of continuation tokens should match the number of new partitions.
@Test(expected = IllegalStateException.class)
public void closeStreamTokenAndNewPartitionCountMismatchedTest() {
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public void heartbeatTest() {
public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream expectedCloseStream =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder().setToken("random-token").build())
.setStatus(Status.newBuilder().setCode(0).build())
.build();
assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(streamContinuationToken)
.setStatus(Status.newBuilder().setCode(0).build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.setStatus(Status.newBuilder().setCode(11))
.build();
ReadChangeStreamResponse response =
ReadChangeStreamResponse.newBuilder().setCloseStream(closeStreamProto).build();
Expand All @@ -127,5 +128,8 @@ public void closeStreamTest() {
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
assertThat(changeStreamContinuationToken.getToken())
.isEqualTo(streamContinuationToken.getToken());
assertThat(closeStream.getNewPartitions().size()).isEqualTo(1);
assertThat(closeStream.getNewPartitions().get(0))
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile;
Expand Down Expand Up @@ -173,6 +174,14 @@ public void test() throws Exception {
.setToken(token.getToken())
.build());
}
for (ByteStringRange newPartition : closeStream.getNewPartitions()) {
builder.addNewPartitions(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(newPartition.getStart())
.setEndKeyOpen(newPartition.getEnd())));
}
ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
actualResults.add(
ReadChangeStreamTest.Result.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to
.build();
}

private StreamPartition createNewPartitionForCloseStream() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED))
.setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)))
.build();
}

private ReadChangeStreamResponse.Heartbeat createHeartbeat(
StreamContinuationToken streamContinuationToken) {
return ReadChangeStreamResponse.Heartbeat.newBuilder()
Expand All @@ -130,11 +139,18 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat(
.build();
}

private ReadChangeStreamResponse.CloseStream createCloseStream() {
return ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.setStatus(com.google.rpc.Status.newBuilder().setCode(0).build())
.build();
private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) {
ReadChangeStreamResponse.CloseStream.Builder builder =
ReadChangeStreamResponse.CloseStream.newBuilder();
if (isOk) {
builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0));
} else {
builder
.setStatus(com.google.rpc.Status.newBuilder().setCode(11))
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.addNewPartitions(createNewPartitionForCloseStream());
}
return builder.build();
}

private ReadChangeStreamResponse.DataChange createDataChange(boolean done) {
Expand Down Expand Up @@ -178,7 +194,7 @@ public void happyPathHeartbeatTest() {
@Test
public void happyPathCloseStreamTest() {
ReadChangeStreamResponse closeStreamResponse =
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(true)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse));
List<ChangeStreamRecord> actualResults = getResults();
Expand Down Expand Up @@ -221,7 +237,7 @@ public void singleHeartbeatImmediateRetryTest() {
public void singleCloseStreamImmediateRetryTest() {
// CloseStream.
ReadChangeStreamResponse closeStreamResponse =
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(false)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE));
// Resume with the exact same request.
Expand Down
Loading