Skip to content

Commit

Permalink
Fix compactor tests (#3632)
Browse files Browse the repository at this point in the history
- Fix flaky compactor test - validateLivenessNonLeaderTest
- Remove sleeps in CompactorServiceUnitTest
  • Loading branch information
SravanthiAshokKumar committed May 24, 2023
1 parent f89653a commit 5a5f336
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

Expand All @@ -57,9 +56,8 @@ public class CompactorServiceUnitTest {

private static final int SCHEDULER_INTERVAL = 1;
private static final String NODE_ENDPOINT = "NodeEndpoint";
private static final int SLEEP_WAIT = 8;
private static final Duration TIMEOUT = Duration.ofSeconds(8);
private static final String NODE_0 = "0";
private static final String SLEEP_INTERRUPTED_EXCEPTION_MSG = "Sleep interrupted";

@Before
public void setup() throws Exception {
Expand Down Expand Up @@ -104,14 +102,9 @@ public void runOrchestratorNonLeaderTest() {
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(invokeCheckpointingJvm, times(1)).shutdown();
verify(invokeCheckpointingJvm).invokeCheckpointing();

verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).shutdown();
verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).invokeCheckpointing();
}

@Test
Expand All @@ -130,14 +123,9 @@ public void runOrchestratorNonLeaderOnExceptionTest() throws Exception {
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(invokeCheckpointingJvm, times(1)).shutdown();
verify(invokeCheckpointingJvm).invokeCheckpointing();

verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).shutdown();
verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).invokeCheckpointing();
}

@Test
Expand All @@ -159,15 +147,10 @@ public void runOrchestratorLeaderTest() throws Exception {
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(leaderServices).validateLiveness();
verify(leaderServices).initCompactionCycle();
verify(invokeCheckpointingJvm, times(1)).shutdown();

verify(leaderServices, timeout(TIMEOUT.toMillis())).validateLiveness();
verify(leaderServices, timeout(TIMEOUT.toMillis())).initCompactionCycle();
verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).shutdown();
}

@Test
Expand All @@ -185,13 +168,8 @@ public void failOnAcquireManagerStatusTest() throws Exception {
doReturn(CompactorLeaderServices.LeaderInitStatus.SUCCESS).when(leaderServices).initCompactionCycle();

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(leaderServices, never()).initCompactionCycle();
verify(leaderServices, after((int) TIMEOUT.toMillis()).never()).initCompactionCycle();
}

@Test
Expand All @@ -215,18 +193,9 @@ public void runOrchestratorSchedulerTest() throws Exception {
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
while (true) {
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
break;
} catch (InterruptedException e) {
//sleep gets interrupted on throwable, hence the loop
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}
}

verify(leaderServices).initCompactionCycle();
verify(invokeCheckpointingJvm, times(1)).shutdown();

verify(leaderServices, timeout(TIMEOUT.toMillis())).initCompactionCycle();
verify(invokeCheckpointingJvm, timeout(TIMEOUT.toMillis())).shutdown();
}

@Test
Expand All @@ -245,14 +214,9 @@ public void disableCompactionAfterStartedTest() {
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(leaderServices, never()).validateLiveness();
verify(leaderServices, never()).initCompactionCycle();

verify(leaderServices, after((int) TIMEOUT.toMillis()).never()).validateLiveness();
verify(leaderServices, after((int) TIMEOUT.toMillis()).never()).initCompactionCycle();
verify(leaderServices, atLeastOnce()).finishCompactionCycle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ public void invokeConcurrentSystemDownHandlerTest() throws Exception {

CorfuStore corfuStore = mock(CorfuStore.class);
TxnContext txn = mock(TxnContext.class);
CorfuStoreEntry record = mock(CorfuStoreEntry.class);
CorfuStoreEntry corfuStoreEntry = mock(CorfuStoreEntry.class);
doReturn(txn).when(corfuStore).txn(any());
doReturn(record).when(txn).getRecord(anyString(), any());
doReturn(null).when(record).getPayload();
doReturn(corfuStoreEntry).when(txn).getRecord(anyString(), any());
doReturn(null).when(corfuStoreEntry).getPayload();
doReturn(corfuStore).when(compactorServiceSpy).getCorfuStore();

//return runtime2 when systemHandler is invoked the 2nd time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,19 +479,17 @@ public void validateLivenessNonLeaderTest() throws Exception {
.persistedCacheRoot(Optional.empty())
.build(), corfuStore, compactorMetadataTables);
distributedCheckpointer.checkpointTables();

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleWithFixedDelay(compactorLeaderServices1::validateLiveness, 0,
LIVENESS_TIMEOUT, TimeUnit.MILLISECONDS);
//If the leader doesn't invoke the validateLiveness method, it will never invoke finishCompactionCycle

try {
TimeUnit.MILLISECONDS.sleep(LIVENESS_TIMEOUT);
while (!verifyCheckpointStatusTable(StatusType.COMPLETED, 0)) {
TimeUnit.MILLISECONDS.sleep(LIVENESS_TIMEOUT);
}
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

assert verifyManagerStatus(StatusType.STARTED);
assert verifyCheckpointStatusTable(StatusType.COMPLETED, 0);
}

@Test
Expand Down

0 comments on commit 5a5f336

Please sign in to comment.