Skip to content

Commit

Permalink
Fix the flakiness in LogReplicationFSMTest.cancelSnapshotSyncInProgre…
Browse files Browse the repository at this point in the history
…ssAndRetry (#3662)
  • Loading branch information
pankti-m committed Jul 27, 2023
1 parent 85d9ea1 commit 27f8b7c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public class TestDataSender implements DataSender {

private long snapshotSyncBaseSnapshot = 0;

// Flag which prevents this sender from replying with ACKs so that the test can wait in IN_SNAPSHOT_SYNC state
// for any operation or validation
private boolean waitInSnapshotSync;

public TestDataSender(boolean waitInSnapshotSync) {
this.waitInSnapshotSync = waitInSnapshotSync;
}

@Override
public CompletableFuture<LogReplicationEntryMsg> send(LogReplicationEntryMsg message) {
if (message != null && !message.getData().isEmpty() &&
Expand All @@ -37,6 +45,12 @@ public CompletableFuture<LogReplicationEntryMsg> send(LogReplicationEntryMsg mes
}

CompletableFuture<LogReplicationEntryMsg> cf = new CompletableFuture<>();

// Do not send an ACK if the test needs to wait in IN_SNAPSHOT_SYNC state.
if (waitInSnapshotSync) {
return cf;
}

LogReplicationEntryMetadataMsg.Builder ackMetadata =
LogReplicationEntryMetadataMsg.newBuilder().mergeFrom(message.getMetadata());

Expand Down Expand Up @@ -97,6 +111,7 @@ public CompletableFuture<LogReplicationMetadataResponseMsg> sendMetadataRequest(

public void reset() {
entryQueue.clear();
waitInSnapshotSync = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void stopAckReader() {
@Test
public void testLogReplicationFSMTransitions() throws Exception {

initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

// Initial state: Initialized
LogReplicationState initState = fsm.getState();
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testLogReplicationFSMTransitions() throws Exception {
@Test
public void testSyncStatusUpdatesForSnapshotOnInit() throws Exception {
final int updateToStatusTableFromOnEntry = 1;
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

final Table<ReplicationStatusKey, ReplicationStatusVal, Message> statusTable =
this.corfuStore.getTable(NAMESPACE, REPLICATION_STATUS_TABLE);
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testSyncStatusUpdatesForSnapshotOnInit() throws Exception {
@Test
public void testSyncStatusUpdatesForLogEntryOnInit() throws Exception {
final int updateToStatusTableFromOnEntry = 1;
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

final Table<ReplicationStatusKey, ReplicationStatusVal, Message> statusTable =
this.corfuStore.getTable(NAMESPACE, REPLICATION_STATUS_TABLE);
Expand Down Expand Up @@ -321,7 +321,7 @@ public void testSyncStatusUpdatesForLogEntryOnInit() throws Exception {
@Test
public void testSyncStatusUpdatesForSnapshot() throws Exception {
final int updateToStatusTableFromOnEntry = 3;
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

CountDownLatch statusTableLatch = new CountDownLatch(updateToStatusTableFromOnEntry);
TestStatusTableStreamListener streamListener = new TestStatusTableStreamListener(statusTableLatch);
Expand Down Expand Up @@ -373,7 +373,7 @@ public void testSyncStatusUpdatesForSnapshot() throws Exception {
@Test
public void testSyncStatusUpdatesForLogEntry() throws Exception {
final int updateToStatusTableFromOnEntry = 2;
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

CountDownLatch statusTableLatch = new CountDownLatch(updateToStatusTableFromOnEntry);
TestStatusTableStreamListener streamListener = new TestStatusTableStreamListener(statusTableLatch);
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testSyncStatusUpdatesForLogEntry() throws Exception {
*/
@Test
public void testSyncStatusUpdatesForSnapshotToLogEntryTransition() throws Exception {
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

final Table<ReplicationStatusKey, ReplicationStatusVal, Message> statusTable =
this.corfuStore.getTable(NAMESPACE, REPLICATION_STATUS_TABLE);
Expand Down Expand Up @@ -481,7 +481,7 @@ public void testSyncStatusUpdatesForSnapshotToLogEntryTransition() throws Except
*/
@Test
public void testSyncStatusUpdatesForLogEntryToSnapshotTransition() throws Exception {
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

final Table<ReplicationStatusKey, ReplicationStatusVal, Message> statusTable =
this.corfuStore.getTable(NAMESPACE, REPLICATION_STATUS_TABLE);
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testSyncStatusUpdatesForLogEntryToSnapshotTransition() throws Except
*/
@Test
public void testTrimExceptionFSM() throws Exception {
initLogReplicationFSM(ReaderImplementation.EMPTY);
initLogReplicationFSM(ReaderImplementation.EMPTY, false);

// Initial acquire of the semaphore, so the occurrence of the transition releases it for the transition itself.
transitionAvailable.acquire();
Expand Down Expand Up @@ -612,7 +612,7 @@ private void testSnapshotSender(int batchSize) {
observeTransitions = false;

// Initialize State Machine
initLogReplicationFSM(ReaderImplementation.TEST);
initLogReplicationFSM(ReaderImplementation.TEST, false);

// Modify test configuration to the specified batch size
((TestSnapshotReader)snapshotReader).setBatchSize(batchSize);
Expand Down Expand Up @@ -654,25 +654,19 @@ private void testSnapshotSender(int batchSize) {
*
* (1) Write NUM_ENTRIES to StreamA
* (2) Trigger start of SNAPSHOT_SYNC
* (3) Interrupt/Stop snapshot sync when 2 messages have been sent to the remote site.
* (3) Interrupt/Stop snapshot sync when waiting for ACK for the receipt of entries
* (4) Verify that this causes the FSM to transition to INITIALIZED state
* (4) Re-trigger SNAPSHOT_SYNC
* (5) Check for completeness, i.e., that state has changed to IN_LOG_ENTRY_SYNC
*
* @throws Exception
*/
@Test
public void cancelSnapshotSyncInProgressAndRetry() throws Exception {
// This test needs to observe the number of messages generated during snapshot sync to interrupt/stop it,
// before it completes.
observeSnapshotSync = true;

// Initialize State Machine
initLogReplicationFSM(ReaderImplementation.TEST);
public void cancelSnapshotSyncInProgressAndRetry() {
observeTransitions = false;

// Modify test configuration to the specified batch size (since we write NUM_ENTRIES = 10) and we send in
// batches of BATCH_SIZE = 2, we will stop snapshot sync at 2 sent messages.
((TestSnapshotReader)snapshotReader).setBatchSize(BATCH_SIZE);
limitSnapshotMessages = 2;
// Initialize State Machine and data sender
initLogReplicationFSM(ReaderImplementation.TEST, true);

// Write NUM_ENTRIES to streamA
List<TokenResponse> writeTokens = writeToStream();
Expand All @@ -683,51 +677,39 @@ public void cancelSnapshotSyncInProgressAndRetry() throws Exception {
// Write to Stream will write to some addresses. SnapshotReader should only read from those addresses
((TestSnapshotReader) snapshotReader).setSeqNumsToRead(seqNums);

// Initial acquire of semaphore, the transition method will block until a transition occurs
transitionAvailable.acquire();
log.info("**** Transition to Snapshot Sync");

log.debug("**** Transition to Snapshot Sync");
// Transition #1: Snapshot Sync Request
transition(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST, LogReplicationStateType.IN_SNAPSHOT_SYNC);
fsm.input(new LogReplicationEvent(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST));

// We observe the number of transmitted messages and force a REPLICATION_STOP, when 2 messages have been sent
// so we verify the state moves to INITIALIZED again.
transitionAvailable.acquire();
log.debug("**** Stop Replication");
fsm.input(new LogReplicationEvent(LogReplicationEventType.REPLICATION_STOP));
// Wait till the fsm reaches IN_SNAPSHOT_SYNC state
while (fsm.getState().getType() != LogReplicationStateType.IN_SNAPSHOT_SYNC) {
log.debug("Wait for the FSM to reach IN_SNAPSHOT_SYNC state");
}
assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.IN_SNAPSHOT_SYNC);

transitionAvailable.acquire();
log.info("**** Stop Replication when in IN_SNAPSHOT_SYNC state");
fsm.input(new LogReplicationEvent(LogReplicationEventType.REPLICATION_STOP));

while (fsm.getState().getType() != LogReplicationStateType.INITIALIZED) {
// Wait on a FSM transition to occur
transitionAvailable.acquire();
log.debug("Waiting for the FSM to reach INITIALIZED state");
}

assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.INITIALIZED);

((TestDataSender) dataSender).reset();
fsm.input(new LogReplicationEvent(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST));

// Stop observing number of messages in snapshot sync, so this time it completes
observeSnapshotSync = false;

// Transition #2: This time the snapshot sync completes
UUID snapshotSyncId = transition(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST, LogReplicationStateType.IN_SNAPSHOT_SYNC, true);

while (fsm.getState().getType() != LogReplicationStateType.WAIT_SNAPSHOT_APPLY) {
// Block until FSM moves back to in log entry (delta) sync state
transitionAvailable.acquire();
while (fsm.getState().getType() != LogReplicationStateType.IN_LOG_ENTRY_SYNC) {
log.info("Waiting for the FSM to reach IN_LOG_ENTRY_SYNC state");
}

assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.WAIT_SNAPSHOT_APPLY);
assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.IN_LOG_ENTRY_SYNC);

Queue<LogReplicationEntryMsg> listenerQueue = ((TestDataSender) dataSender).getEntryQueue();

assertThat(listenerQueue.size()).isEqualTo(NUM_ENTRIES);

log.debug("**** Snapshot Sync Complete");
transition(LogReplicationEventType.SNAPSHOT_APPLY_COMPLETE, LogReplicationStateType.IN_LOG_ENTRY_SYNC, snapshotSyncId, true);

assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.IN_LOG_ENTRY_SYNC);

for (int i=0; i<NUM_ENTRIES; i++) {
assertThat(listenerQueue.poll().getData())
Expand All @@ -745,7 +727,7 @@ public void cancelSnapshotSyncInProgressAndRetry() throws Exception {
public void testSnapshotSyncStreamImplementation() throws Exception {

// Initialize State Machine
initLogReplicationFSM(ReaderImplementation.STREAMS);
initLogReplicationFSM(ReaderImplementation.STREAMS, false);

// Write LARGE_NUM_ENTRIES to streamA
writeToMap();
Expand Down Expand Up @@ -836,7 +818,7 @@ private List<TokenResponse> writeToStream() {
*
* @param readerImpl implementation to use for readers.
*/
private void initLogReplicationFSM(ReaderImplementation readerImpl) {
private void initLogReplicationFSM(ReaderImplementation readerImpl, boolean waitInSnapshotSync) {

String fullyQualifiedStreamName = TableRegistry.getFullyQualifiedTableName(TEST_NAMESPACE, TEST_STREAM_NAME);
LogEntryReader logEntryReader = new TestLogEntryReader();
Expand Down Expand Up @@ -865,12 +847,12 @@ private void initLogReplicationFSM(ReaderImplementation readerImpl) {
.batchSize(BATCH_SIZE).build();

snapshotReader = new TestSnapshotReader(testConfig);
dataSender = new TestDataSender();
dataSender = new TestDataSender(waitInSnapshotSync);
break;
case STREAMS:
// Default implementation used for Log Replication (stream-based)
snapshotReader = new StreamsSnapshotReader(newRT, config);
dataSender = new TestDataSender();
dataSender = new TestDataSender(waitInSnapshotSync);
break;
default:
break;
Expand Down

0 comments on commit 27f8b7c

Please sign in to comment.