diff --git a/test/src/test/java/org/corfudb/infrastructure/logreplication/LogReplicationFSMTest.java b/test/src/test/java/org/corfudb/infrastructure/logreplication/LogReplicationFSMTest.java index b5adc46be27..9b6eef7e28c 100644 --- a/test/src/test/java/org/corfudb/infrastructure/logreplication/LogReplicationFSMTest.java +++ b/test/src/test/java/org/corfudb/infrastructure/logreplication/LogReplicationFSMTest.java @@ -99,6 +99,12 @@ public class LogReplicationFSMTest extends AbstractViewTest implements Observer // We observe the transition counter to know that a transition occurred. private ObservableValue transitionObservable; + // Flag which determines if state transitions to non-terminal states will be observed. As updates to the observable + // can be delivered with a delay, validating based on non-terminal states can cause intermittent failures(FSM may + // have moved to the next state by the time the update callback is received). So tests can set this flag to + // avoid observing such non-terminal transitions. + private boolean observeTransitions = true; + // Flag indicating if we should observer a snapshot sync, this is to interrupt it at any given stage private boolean observeSnapshotSync = false; private int limitSnapshotMessages = 0; @@ -601,7 +607,10 @@ public void testLogReplicationSnapshotTransmitterBatch() throws Exception { testSnapshotSender(BATCH_SIZE); } - private void testSnapshotSender(int batchSize) throws Exception { + private void testSnapshotSender(int batchSize) { + + // Set the flag to avoid observing transitions to non-terminal states + observeTransitions = false; // Initialize State Machine initLogReplicationFSM(ReaderImplementation.TEST); @@ -618,25 +627,20 @@ private void testSnapshotSender(int batchSize) throws Exception { // Write to Stream will write to some addresses. SnapshotReader should only read from those addresses ((TestSnapshotReader) snapshotReader).setSeqNumsToRead(seqNums); - // Initial acquire of semaphore, the transition method will block until a transition occurs - transitionAvailable.acquire(); - - // Transition #1: Snapshot Sync Request - UUID snapshotSyncRequestId = transition(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST, LogReplicationStateType.IN_SNAPSHOT_SYNC); - - // Block until the snapshot sync completes and next transition occurs. - // The transition should happen to IN_LOG_ENTRY_SYNC state. - Queue listenerQueue = ((TestDataSender) dataSender).getEntryQueue(); + // Input the Snapshot Sync Request to the FSM. + LogReplicationEvent event = new LogReplicationEvent(LogReplicationEventType.SNAPSHOT_SYNC_REQUEST); + fsm.input(event); - while(!fsm.getState().getType().equals(LogReplicationStateType.WAIT_SNAPSHOT_APPLY)) { - transitionAvailable.acquire(); + // Once Snapshot Sync starts, the data must be sent and the fsm will eventually reach the terminal + // IN_LOG_ENTRY_SYNC state + while(fsm.getState().getType() != LogReplicationStateType.IN_LOG_ENTRY_SYNC) { + log.debug("Current State = {}, Waiting for IN_LOG_ENTRY_SYNC state", fsm.getState().getType()); } - - assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.WAIT_SNAPSHOT_APPLY); - - transition(LogReplicationEventType.SNAPSHOT_APPLY_COMPLETE, LogReplicationStateType.IN_LOG_ENTRY_SYNC, snapshotSyncRequestId, true); assertThat(fsm.getState().getType()).isEqualTo(LogReplicationStateType.IN_LOG_ENTRY_SYNC); + // Verify that the expected data was captured in the TestSender's queue. + Queue listenerQueue = ((TestDataSender) dataSender).getEntryQueue(); + assertThat(listenerQueue.size()).isEqualTo(NUM_ENTRIES); for (int i = 0; i < NUM_ENTRIES; i++) { @@ -963,6 +967,9 @@ private UUID transition(LogReplicationEventType eventType, */ @Override public void update(Observable obs, Object arg) { + if (!observeTransitions) { + return; + } if (obs.equals(transitionObservable)) { while (!transitionAvailable.hasQueuedThreads()) { // Wait until some thread is waiting to acquire...