From 9e1110600dc64defcd9143753f45b5b8226aa339 Mon Sep 17 00:00:00 2001 From: tengzhonger <109308630+tengzhonger@users.noreply.github.com> Date: Fri, 24 Feb 2023 11:48:12 -0500 Subject: [PATCH] fix: Fix StackOverflow in ChangeStreamStateMachine due to excessive mods (#1648) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../ChangeStreamStateMachine.java | 306 ++++++++---------- .../ChangeStreamStateMachineTest.java | 36 +++ 2 files changed, 170 insertions(+), 172 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java index 9654b4da30..2aa9c537db 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -172,11 +172,9 @@ void handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { *
*
Valid states: *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} - *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} - *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
{@link ChangeStreamStateMachine#AWAITING_NEW_DATA_CHANGE} *
Resulting states: - *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} - *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
{@link ChangeStreamStateMachine#AWAITING_NEW_DATA_CHANGE} *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} *
* @@ -188,7 +186,7 @@ void handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { try { numDataChanges++; - currentState = currentState.handleMod(dataChange, 0); + currentState = currentState.handleDataChange(dataChange); } catch (RuntimeException e) { currentState = ERROR; throw e; @@ -268,18 +266,17 @@ ChangeStreamStateMachine.State handleCloseStream( } /** - * Accepts a new mod and transitions to the next state. A mod could be a DeleteFamily, a - * DeleteColumn, or a SetCell. + * Accepts a new DataChange and transitions to the next state. A DataChange can have multiple + * mods, where each mod could be a DeleteFamily, a DeleteColumn, or a SetCell. * * @param dataChange The DataChange that holds the new mod to process. - * @param index The index of the mod in the DataChange. * @return The next state. - * @throws IllegalStateException If the subclass can't handle the mod. + * @throws IllegalStateException If the subclass can't handle the DataChange. * @throws ChangeStreamStateMachine.InvalidInputException If the subclass determines that this * dataChange is invalid. */ - ChangeStreamStateMachine.State handleMod( - ReadChangeStreamResponse.DataChange dataChange, int index) { + ChangeStreamStateMachine.State handleDataChange( + ReadChangeStreamResponse.DataChange dataChange) { throw new IllegalStateException(); } } @@ -292,7 +289,8 @@ ChangeStreamStateMachine.State handleMod( *
*
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}, in case of a Heartbeat * or a CloseStream. - *
Same as {@link ChangeStreamStateMachine#AWAITING_NEW_MOD}, depending on the DataChange. + *
Same as {@link ChangeStreamStateMachine#AWAITING_NEW_DATA_CHANGE}, depending on the + * DataChange. *
*/ private final State AWAITING_NEW_STREAM_RECORD = @@ -316,7 +314,7 @@ State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { } @Override - State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { validate( completeChangeStreamRecord == null, "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); @@ -326,9 +324,6 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { validate( dataChange.hasCommitTimestamp(), "AWAITING_NEW_STREAM_RECORD: First data change missing commit timestamp."); - validate( - index == 0, - "AWAITING_NEW_STREAM_RECORD: First data change should start with the first mod."); validate( dataChange.getChunksCount() > 0, "AWAITING_NEW_STREAM_RECORD: First data change missing mods."); @@ -356,161 +351,138 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { } else { validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType()); } - return AWAITING_NEW_MOD.handleMod(dataChange, index); + return AWAITING_NEW_DATA_CHANGE.handleDataChange(dataChange); } }; /** - * A state to handle the next Mod. + * A state to handle the next DataChange. * *
*
Valid exit states: - *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current mod is added, and we have more - * mods to expect. - *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current mod is the first chunk of a - * chunked SetCell. - *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current mod is the last - * mod of the current logical mutation. + *
{@link ChangeStreamStateMachine#AWAITING_NEW_DATA_CHANGE}. All mods from the current + * DataChange are added, and we have more DataChange to expect. + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current DataChange is + * the last DataChange of the current logical mutation. *
*/ - private final State AWAITING_NEW_MOD = + private final State AWAITING_NEW_DATA_CHANGE = new State() { @Override State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { throw new IllegalStateException( - "AWAITING_NEW_MOD: Can't handle a Heartbeat in the middle of building a ChangeStreamMutation."); + "AWAITING_NEW_DATA_CHANGE: Can't handle a Heartbeat in the middle of building a ChangeStreamMutation."); } @Override State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { throw new IllegalStateException( - "AWAITING_NEW_MOD: Can't handle a CloseStream in the middle of building a ChangeStreamMutation."); + "AWAITING_NEW_DATA_CHANGE: Can't handle a CloseStream in the middle of building a ChangeStreamMutation."); } @Override - State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { - validate( - 0 <= index && index <= dataChange.getChunksCount() - 1, - "AWAITING_NEW_MOD: Index out of bound."); - ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); - Mutation mod = chunk.getMutation(); - // Case 1: SetCell - if (mod.hasSetCell()) { - // Start the Cell and delegate to AWAITING_CELL_VALUE to add the cell value. - Mutation.SetCell setCell = chunk.getMutation().getSetCell(); - if (chunk.hasChunkInfo()) { - // If it has chunk info, it must be the first chunk of a chunked SetCell. - validate( - chunk.getChunkInfo().getChunkedValueOffset() == 0, - "AWAITING_NEW_MOD: First chunk of a chunked cell must start with offset==0."); - validate( - chunk.getChunkInfo().getChunkedValueSize() > 0, - "AWAITING_NEW_MOD: First chunk of a chunked cell must have a positive chunked value size."); - expectedTotalSizeOfChunkedSetCell = chunk.getChunkInfo().getChunkedValueSize(); - actualTotalSizeOfChunkedSetCell = 0; + State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { + // Iterate over all mods. + for (int index = 0; index < dataChange.getChunksCount(); ++index) { + ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); + Mutation mod = chunk.getMutation(); + // Case 1: SetCell + if (mod.hasSetCell()) { + Mutation.SetCell setCell = chunk.getMutation().getSetCell(); + // Case 1_1: Current SetCell is NOT chunked, in which case there is no ChunkInfo. + if (!chunk.hasChunkInfo()) { + builder.startCell( + setCell.getFamilyName(), + setCell.getColumnQualifier(), + setCell.getTimestampMicros()); + numCellChunks++; + builder.cellValue(setCell.getValue()); + builder.finishCell(); + continue; + } else { + // Case 1_2: This chunk is from a chunked SetCell, which must be one of the + // following: + // Case 1_2_1: The first chunk of a chunked SetCell. For example: SetCell_chunk_1 + // in + // [ReadChangeStreamResponse1: {..., SetCell_chunk_1}, ReadChangeStreamResponse2: + // {SetCell_chunk_2, ...}]. + // Case 1_2_2: A non-first chunk from a chunked SetCell. For example: + // SetCell_chunk_2 in + // [ReadChangeStreamResponse1: {..., SetCell_chunk_1}, ReadChangeStreamResponse2: + // {SetCell_chunk_2, ...}]. Note that in this case this chunk must be the first + // chunk for the current DataChange, because a SetCell can NOT be chunked within + // the same DataChange, i.e. there is no such DataChange as + // [ReadChangeStreamResponse: {SetCell_chunk_1, SetCell_chunk_2}]. + if (chunk.getChunkInfo().getChunkedValueOffset() == 0) { + // Case 1_2_1 + validate( + chunk.getChunkInfo().getChunkedValueSize() > 0, + "AWAITING_NEW_DATA_CHANGE: First chunk of a chunked cell must have a positive chunked value size."); + expectedTotalSizeOfChunkedSetCell = chunk.getChunkInfo().getChunkedValueSize(); + actualTotalSizeOfChunkedSetCell = 0; + builder.startCell( + setCell.getFamilyName(), + setCell.getColumnQualifier(), + setCell.getTimestampMicros()); + } else { + // Case 1_2_2 + validate( + index == 0, + "AWAITING_NEW_DATA_CHANGE: Non-first chunked SetCell must be the first mod of a DataChange."); + } + // Concatenate the cell value of this mod into the builder. + validate( + chunk.getChunkInfo().getChunkedValueSize() == expectedTotalSizeOfChunkedSetCell, + "AWAITING_NEW_DATA_CHANGE: Chunked cell value size must be the same for all chunks."); + numCellChunks++; + builder.cellValue(setCell.getValue()); + actualTotalSizeOfChunkedSetCell += setCell.getValue().size(); + // If it's the last chunk of the chunked SetCell, finish the cell. + if (chunk.getChunkInfo().getLastChunk()) { + builder.finishCell(); + validate( + actualTotalSizeOfChunkedSetCell == expectedTotalSizeOfChunkedSetCell, + "Chunked value size in ChunkInfo doesn't match the actual total size. " + + "Expected total size: " + + expectedTotalSizeOfChunkedSetCell + + "; actual total size: " + + actualTotalSizeOfChunkedSetCell); + continue; + } else { + // If this is not the last chunk of a chunked SetCell, then this must be the last + // mod of the current response, and we're expecting the rest of the chunked cells + // in the following ReadChangeStream response. + validate( + index == dataChange.getChunksCount() - 1, + "AWAITING_NEW_DATA_CHANGE: Current mod is a chunked SetCell " + + "but not the last chunk, but it's not the last mod of the current response."); + return AWAITING_NEW_DATA_CHANGE; + } + } } - builder.startCell( - setCell.getFamilyName(), - setCell.getColumnQualifier(), - setCell.getTimestampMicros()); - return AWAITING_CELL_VALUE.handleMod(dataChange, index); - } - // Case 2: DeleteFamily - if (mod.hasDeleteFromFamily()) { - numNonCellMods++; - builder.deleteFamily(mod.getDeleteFromFamily().getFamilyName()); - return checkAndFinishMutationIfNeeded(dataChange, index + 1); - } - // Case 3: DeleteCell - if (mod.hasDeleteFromColumn()) { - numNonCellMods++; - builder.deleteCells( - mod.getDeleteFromColumn().getFamilyName(), - mod.getDeleteFromColumn().getColumnQualifier(), - TimestampRange.create( - mod.getDeleteFromColumn().getTimeRange().getStartTimestampMicros(), - mod.getDeleteFromColumn().getTimeRange().getEndTimestampMicros())); - return checkAndFinishMutationIfNeeded(dataChange, index + 1); - } - throw new IllegalStateException("AWAITING_NEW_MOD: Unexpected mod type"); - } - }; - - /** - * A state that represents a cell's value continuation. - * - *
- *
Valid exit states: - *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current chunked SetCell is added, and - * we have more mods to expect. - *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current chunked SetCell has more - * cell values to expect. - *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current chunked SetCell - * is the last mod of the current logical mutation. - *
- */ - private final State AWAITING_CELL_VALUE = - new State() { - @Override - State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { - throw new IllegalStateException( - "AWAITING_CELL_VALUE: Can't handle a Heartbeat in the middle of building a SetCell."); - } - - @Override - State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { - throw new IllegalStateException( - "AWAITING_CELL_VALUE: Can't handle a CloseStream in the middle of building a SetCell."); - } - - @Override - State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { - validate( - 0 <= index && index <= dataChange.getChunksCount() - 1, - "AWAITING_CELL_VALUE: Index out of bound."); - ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); - validate( - chunk.getMutation().hasSetCell(), - "AWAITING_CELL_VALUE: Current mod is not a SetCell."); - Mutation.SetCell setCell = chunk.getMutation().getSetCell(); - numCellChunks++; - builder.cellValue(setCell.getValue()); - // Case 1: Current SetCell is chunked. For example: [ReadChangeStreamResponse1: - // {DeleteColumn, DeleteFamily, SetCell_1}, ReadChangeStreamResponse2: {SetCell_2, - // DeleteFamily}]. - if (chunk.hasChunkInfo()) { - validate( - chunk.getChunkInfo().getChunkedValueSize() > 0, - "AWAITING_CELL_VALUE: Chunked value size must be positive."); - validate( - chunk.getChunkInfo().getChunkedValueSize() == expectedTotalSizeOfChunkedSetCell, - "AWAITING_CELL_VALUE: Chunked value size must be the same for all chunks."); - actualTotalSizeOfChunkedSetCell += setCell.getValue().size(); - // If it's the last chunk of the chunked SetCell, finish the cell. - if (chunk.getChunkInfo().getLastChunk()) { - builder.finishCell(); - validate( - actualTotalSizeOfChunkedSetCell == expectedTotalSizeOfChunkedSetCell, - "Chunked value size in ChunkInfo doesn't match the actual total size. " - + "Expected total size: " - + expectedTotalSizeOfChunkedSetCell - + "; actual total size: " - + actualTotalSizeOfChunkedSetCell); - return checkAndFinishMutationIfNeeded(dataChange, index + 1); - } else { - // If this is not the last chunk of a chunked SetCell, then this must be the last mod - // of the current response, and we're expecting the rest of the chunked cells in the - // following ReadChangeStream response. - validate( - index == dataChange.getChunksCount() - 1, - "AWAITING_CELL_VALUE: Current mod is a chunked SetCell " - + "but not the last chunk, but it's not the last mod of the current response."); - return AWAITING_CELL_VALUE; + // Case 2: DeleteFamily + if (mod.hasDeleteFromFamily()) { + numNonCellMods++; + builder.deleteFamily(mod.getDeleteFromFamily().getFamilyName()); + continue; + } + // Case 3: DeleteCell + if (mod.hasDeleteFromColumn()) { + numNonCellMods++; + builder.deleteCells( + mod.getDeleteFromColumn().getFamilyName(), + mod.getDeleteFromColumn().getColumnQualifier(), + TimestampRange.create( + mod.getDeleteFromColumn().getTimeRange().getStartTimestampMicros(), + mod.getDeleteFromColumn().getTimeRange().getEndTimestampMicros())); + continue; } + throw new IllegalStateException("AWAITING_NEW_DATA_CHANGE: Unexpected mod type"); } - // Case 2: Current SetCell is not chunked. - builder.finishCell(); - return checkAndFinishMutationIfNeeded(dataChange, index + 1); + + // After adding all mods from this DataChange to the state machine, finish the current + // logical mutation, or wait for the next DataChange response. + return checkAndFinishMutationIfNeeded(dataChange); } }; @@ -535,7 +507,7 @@ State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { } @Override - State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { throw new IllegalStateException( "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); } @@ -558,39 +530,29 @@ State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { } @Override - State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { throw new IllegalStateException("ERROR: Failed to handle DataChange."); } }; /** - * Check if we should continue handling mods in the current DataChange or wrap up. There are 3 - * cases: + * Check if we should continue handling DataChanges in the following responses or wrap up. There + * are 2 cases: * * */ - private State checkAndFinishMutationIfNeeded( - ReadChangeStreamResponse.DataChange dataChange, int index) { - validate( - 0 <= index && index <= dataChange.getChunksCount(), - "checkAndFinishMutationIfNeeded: index out of bound."); - // Case 1): Handle the next mod. - if (index < dataChange.getChunksCount()) { - return AWAITING_NEW_MOD.handleMod(dataChange, index); - } - // If we reach here, it means that all the mods in this DataChange have been handled. We should + private State checkAndFinishMutationIfNeeded(ReadChangeStreamResponse.DataChange dataChange) { + // This function is called when all the mods in this DataChange have been handled. We should // finish up the logical mutation or wait for more mods in the next ReadChangeStreamResponse, // depending on whether the current response is the last response for the logical mutation. if (dataChange.getDone()) { - // Case 2_1): Current change stream mutation is complete. + // Case 1: Current change stream mutation is complete. validate(!dataChange.getToken().isEmpty(), "Last data change missing token"); validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark"); completeChangeStreamRecord = @@ -601,10 +563,10 @@ private State checkAndFinishMutationIfNeeded( dataChange.getEstimatedLowWatermark().getNanos())); return AWAITING_STREAM_RECORD_CONSUME; } - // Case 2_2): The current DataChange itself is chunked, so wait for the next - // ReadChangeStreamResponse. Note that we should wait for the new mods instead + // Case 2: The current DataChange itself is chunked, so wait for the next + // ReadChangeStreamResponse. Note that we should wait for the new data change instead // of for the new change stream record since the current record hasn't finished yet. - return AWAITING_NEW_MOD; + return AWAITING_NEW_DATA_CHANGE; } private void validate(boolean condition, String message) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java index d86df91c35..b51194f969 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java @@ -17,9 +17,13 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.DefaultChangeStreamRecordAdapter; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -29,6 +33,30 @@ public class ChangeStreamStateMachineTest { ChangeStreamStateMachine changeStreamStateMachine; + private ReadChangeStreamResponse.DataChange createDataChangeWithDeleteFamilyMods( + int numDeleteFamilyMod) { + ReadChangeStreamResponse.DataChange.Builder dataChangeBuilder = + ReadChangeStreamResponse.DataChange.newBuilder() + .setType(ReadChangeStreamResponse.DataChange.Type.USER) + .setSourceClusterId("fake-source-cluster-id") + .setRowKey(ByteString.copyFromUtf8("key")) + .setCommitTimestamp(Timestamp.newBuilder().setSeconds(100).build()) + .setTiebreaker(100); + for (int i = 0; i < numDeleteFamilyMod; ++i) { + Mutation deleteFromFamily = + Mutation.newBuilder() + .setDeleteFromFamily( + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family-" + i).build()) + .build(); + dataChangeBuilder.addChunks( + ReadChangeStreamResponse.MutationChunk.newBuilder().setMutation(deleteFromFamily)); + } + dataChangeBuilder.setDone(true); + dataChangeBuilder.setEstimatedLowWatermark(Timestamp.newBuilder().setSeconds(1).build()); + dataChangeBuilder.setToken("fake-token"); + return dataChangeBuilder.build(); + } + @Before public void setUp() throws Exception { changeStreamStateMachine = @@ -58,4 +86,12 @@ public void testErrorHandlingStats() { assertThat(actualError).hasMessageThat().contains("numCellChunks: 0"); assertThat(actualError).hasMessageThat().contains("actualTotalSizeOfChunkedSetCell: 0"); } + + @Test + public void testNoStackOverflowForManyMods() { + ReadChangeStreamResponse.DataChange dataChange = createDataChangeWithDeleteFamilyMods(500000); + changeStreamStateMachine.handleDataChange(dataChange); + ChangeStreamRecord result = changeStreamStateMachine.consumeChangeStreamRecord(); + assertThat(result instanceof ChangeStreamMutation); + } }