Skip to content

Commit

Permalink
Fix the flakiness in LogReplicationFSMTest.testSnapshotSender. (#3638)
Browse files Browse the repository at this point in the history
testSnapshotSender relies on observing the state transition to a
non-terminal state(InSnapshotSync).  It relies on an Observable to
deliver this update and then directly validates the FSM state.  But the
FSM could have transitioned to the next state, i.e.,
WaitForSnapshotApply or InLogEntrySync by the time the Observable
invokes the update() callback.  Change the test to rely on the terminal
state, i.e., InLogEntrySyncState to make it reliable.

Also removed the dependence on the semaphore for this test as it is much simpler
to wait for the expected state directly.
  • Loading branch information
pankti-m committed Jun 5, 2023
1 parent d3c93e6 commit f2ec6e1
Showing 1 changed file with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class LogReplicationFSMTest extends AbstractViewTest implements Observer
// We observe the transition counter to know that a transition occurred.
private ObservableValue transitionObservable;

// Flag which determines if state transitions to non-terminal states will be observed. As updates to the observable
// can be delivered with a delay, validating based on non-terminal states can cause intermittent failures(FSM may
// have moved to the next state by the time the update callback is received). So tests can set this flag to
// avoid observing such non-terminal transitions.
private boolean observeTransitions = true;

// Flag indicating if we should observer a snapshot sync, this is to interrupt it at any given stage
private boolean observeSnapshotSync = false;
private int limitSnapshotMessages = 0;
Expand Down Expand Up @@ -601,7 +607,10 @@ public void testLogReplicationSnapshotTransmitterBatch() throws Exception {
testSnapshotSender(BATCH_SIZE);
}

private void testSnapshotSender(int batchSize) throws Exception {
private void testSnapshotSender(int batchSize) {

// Set the flag to avoid observing transitions to non-terminal states
observeTransitions = false;

// Initialize State Machine
initLogReplicationFSM(ReaderImplementation.TEST);
Expand All @@ -618,25 +627,20 @@ private void testSnapshotSender(int batchSize) throws Exception {
// Write to Stream will write to some addresses. SnapshotReader should only read from those addresses
((TestSnapshotReader) snapshotReader).setSeqNumsToRead(seqNums);

// Initial acquire of semaphore, the transition method will block until a transition occurs
transitionAvailable.acquire();

// Transition #1: Snapshot Sync Request
UUID snapshotSyncRequestId = transition(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST, LogReplicationStateType.IN_SNAPSHOT_SYNC);

// Block until the snapshot sync completes and next transition occurs.
// The transition should happen to IN_LOG_ENTRY_SYNC state.
Queue<LogReplicationEntryMsg> listenerQueue = ((TestDataSender) dataSender).getEntryQueue();
// Input the Snapshot Sync Request to the FSM.
LogReplicationEvent event = new LogReplicationEvent(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST);
fsm.input(event);

while(!fsm.getState().getType().equals(LogReplicationStateType.WAIT_SNAPSHOT_APPLY)) {
transitionAvailable.acquire();
// Once Snapshot Sync starts, the data must be sent and the fsm will eventually reach the terminal
// IN_LOG_ENTRY_SYNC state
while(fsm.getState().getType() != LogReplicationStateType.IN_LOG_ENTRY_SYNC) {
log.debug("Current State = {}, Waiting for IN_LOG_ENTRY_SYNC state", fsm.getState().getType());
}

assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.WAIT_SNAPSHOT_APPLY);

transition(LogReplicationEventType.SNAPSHOT_APPLY_COMPLETE, LogReplicationStateType.IN_LOG_ENTRY_SYNC, snapshotSyncRequestId, true);
assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.IN_LOG_ENTRY_SYNC);

// Verify that the expected data was captured in the TestSender's queue.
Queue<LogReplicationEntryMsg> listenerQueue = ((TestDataSender) dataSender).getEntryQueue();

assertThat(listenerQueue.size()).isEqualTo(NUM_ENTRIES);

for (int i = 0; i < NUM_ENTRIES; i++) {
Expand Down Expand Up @@ -963,6 +967,9 @@ private UUID transition(LogReplicationEventType eventType,
*/
@Override
public void update(Observable obs, Object arg) {
if (!observeTransitions) {
return;
}
if (obs.equals(transitionObservable)) {
while (!transitionAvailable.hasQueuedThreads()) {
// Wait until some thread is waiting to acquire...
Expand Down

0 comments on commit f2ec6e1

Please sign in to comment.