Skip to content

Commit

Permalink
Additional compactor tests (#3471)
Browse files Browse the repository at this point in the history
Add CorfuStoreCompactorMain UTs and few refactoring changes
  • Loading branch information
SravanthiAshokKumar committed Jan 24, 2023
1 parent b8a17a4 commit 11e7987
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.corfudb;
package org.corfudb.compactor;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.compactor.CorfuStoreCompactorConfig;
import org.corfudb.runtime.CorfuRuntime.CorfuRuntimeParameters;
import org.corfudb.runtime.proto.service.CorfuMessage.PriorityLevel;
import org.corfudb.util.NodeLocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class CorfuStoreCompactorMain {
private final CorfuStore corfuStore;
private final CorfuStoreCompactorConfig config;
private final DistributedCheckpointerHelper distributedCheckpointerHelper;
private static final int RETRY_CHECKPOINTING = 5;
public static final int RETRY_CHECKPOINTING = 5;
private static final int RETRY_CHECKPOINTING_SLEEP_SECOND = 10;

public CorfuStoreCompactorMain(String[] args) throws Exception {
Expand All @@ -39,6 +39,15 @@ public CorfuStoreCompactorMain(String[] args) throws Exception {
this.distributedCheckpointerHelper = new DistributedCheckpointerHelper(corfuStore);
}

public CorfuStoreCompactorMain(CorfuStoreCompactorConfig config, CorfuRuntime corfuRuntime, CorfuRuntime cpRuntime,
CorfuStore corfuStore, DistributedCheckpointerHelper distributedCheckpointerHelper) {
this.config = config;
this.cpRuntime = cpRuntime;
this.corfuRuntime = corfuRuntime;
this.corfuStore = corfuStore;
this.distributedCheckpointerHelper = distributedCheckpointerHelper;
}

/**
* Entry point to invoke checkpointing
*
Expand All @@ -55,7 +64,7 @@ public static void main(String[] args) {
log.info("Exiting CorfuStoreCompactor");
}

private void doCompactorAction() {
protected void doCompactorAction() {
if (config.isFreezeCompaction() || config.isDisableCompaction()) {
if (config.isDisableCompaction()) {
log.info("Disabling compaction...");
Expand Down Expand Up @@ -87,17 +96,17 @@ private void doCompactorAction() {
}
}
if (config.isStartCheckpointing()) {
checkpoint();
DistributedCheckpointer distributedCheckpointer = new ServerTriggeredCheckpointer(CheckpointerBuilder.builder()
.corfuRuntime(corfuRuntime)
.cpRuntime(Optional.of(cpRuntime))
.persistedCacheRoot(config.getPersistedCacheRoot())
.isClient(false)
.build(), corfuStore, distributedCheckpointerHelper.getCompactorMetadataTables());
checkpoint(distributedCheckpointer);
}
}

private void checkpoint() {
DistributedCheckpointer distributedCheckpointer = new ServerTriggeredCheckpointer(CheckpointerBuilder.builder()
.corfuRuntime(corfuRuntime)
.cpRuntime(Optional.of(cpRuntime))
.persistedCacheRoot(config.getPersistedCacheRoot())
.isClient(false)
.build(), corfuStore, distributedCheckpointerHelper.getCompactorMetadataTables());
protected void checkpoint(DistributedCheckpointer distributedCheckpointer) {
try {
for (int i = 0; i < RETRY_CHECKPOINTING; i++) {
if (distributedCheckpointerHelper.hasCompactionStarted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ private CheckpointWriter<ICorfuTable<?,?>> getCheckpointWriter(TableName tableNa
checkpointerBuilder.cpRuntime.get());
CheckpointWriter<ICorfuTable<?,?>> cpw =
new CheckpointWriter(checkpointerBuilder.cpRuntime.get(), streamId, "ServerCheckpointer", corfuTable);
ISerializer serializer = ((CorfuCompileProxy) corfuTable.getCorfuSMRProxy())
.getSerializer();
cpw.setSerializer(serializer);
cpw.setSerializer(keyDynamicProtobufSerializer);
return cpw;
}

Expand Down
1 change: 1 addition & 0 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def run(self):
"""
Run compactor.
"""
self._print_and_log("Invoked compactor_runner...");
if self._config.freezeCompaction and self._config.unfreezeCompaction:
self._print_and_log("ERROR: Both freeze and unfreeze compaction parameters cannot be passed together")
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.corfudb.compactor;

import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.DistributedCheckpointer;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;
import org.junit.Before;
import org.junit.Test;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CorfuStoreCompactorMainUnitTest {
private final CorfuRuntime corfuRuntime = mock(CorfuRuntime.class);
private final CorfuRuntime cpRuntime = mock(CorfuRuntime.class);
private final CorfuStore corfuStore = mock(CorfuStore.class);
private final CorfuStoreCompactorConfig config = mock(CorfuStoreCompactorConfig.class);
private final DistributedCheckpointerHelper distributedCheckpointerHelper = mock(DistributedCheckpointerHelper.class);
private final DistributedCheckpointer distributedCheckpointer = mock(DistributedCheckpointer.class);
private CorfuStoreCompactorMain corfuStoreCompactorMain;

@Before
public void setup() {
when(corfuRuntime.getParameters()).thenReturn(CorfuRuntime.CorfuRuntimeParameters.builder().clientName("TestClient").build());
corfuStoreCompactorMain = spy(new CorfuStoreCompactorMain(config, corfuRuntime, cpRuntime, corfuStore, distributedCheckpointerHelper));
doReturn(true).when(distributedCheckpointerHelper).disableCompaction();
doNothing().when(distributedCheckpointerHelper).enableCompaction();
doNothing().when(distributedCheckpointerHelper).freezeCompaction();
doNothing().when(distributedCheckpointerHelper).unfreezeCompaction();
doNothing().when(distributedCheckpointerHelper).instantTrigger(anyBoolean());
doNothing().when(distributedCheckpointer).checkpointTables();
}

@Test
public void doCompactorActionHappyPath() {
doReturn(true).when(config).isFreezeCompaction();
doReturn(true).when(config).isDisableCompaction();
corfuStoreCompactorMain.doCompactorAction();

doReturn(false).when(config).isFreezeCompaction();
doReturn(false).when(config).isDisableCompaction();
doReturn(true).when(config).isUnfreezeCompaction();
doReturn(true).when(config).isEnableCompaction();
doReturn(true).when(config).isInstantTriggerCompaction();
doReturn(true).when(config).isStartCheckpointing();
doNothing().when(corfuStoreCompactorMain).checkpoint(any());
when(config.isTrim()).thenReturn(true).thenReturn(false);
corfuStoreCompactorMain.doCompactorAction();
corfuStoreCompactorMain.doCompactorAction();

verify(distributedCheckpointerHelper).disableCompaction();
verify(distributedCheckpointerHelper).freezeCompaction();
verify(distributedCheckpointerHelper, times(2)).enableCompaction();
verify(distributedCheckpointerHelper, times(2)).unfreezeCompaction();
verify(corfuStoreCompactorMain, times(2)).checkpoint(any());
verify(distributedCheckpointerHelper).instantTrigger(true);
verify(distributedCheckpointerHelper).instantTrigger(false);
}

@Test
public void doCompactorActionIncorrectConfigCombinations() {
doReturn(true).when(config).isFreezeCompaction();
doReturn(true).when(config).isUnfreezeCompaction();
corfuStoreCompactorMain.doCompactorAction();

doReturn(true).when(config).isDisableCompaction();
doReturn(true).when(config).isEnableCompaction();
doReturn(false).when(config).isFreezeCompaction();
doReturn(false).when(config).isUnfreezeCompaction();
corfuStoreCompactorMain.doCompactorAction();

verify(distributedCheckpointerHelper).freezeCompaction();
verify(distributedCheckpointerHelper).disableCompaction();
verify(distributedCheckpointerHelper, never()).unfreezeCompaction();
verify(distributedCheckpointerHelper, never()).enableCompaction();
}

@Test
public void checkpointFailureTest() {
doReturn(false).when(distributedCheckpointerHelper).hasCompactionStarted();
corfuStoreCompactorMain.checkpoint(distributedCheckpointer);

verify(distributedCheckpointerHelper, times(CorfuStoreCompactorMain.RETRY_CHECKPOINTING)).hasCompactionStarted();
verify(distributedCheckpointer).shutdown();
}

@Test
public void checkpointSuccessTest() {
when(distributedCheckpointerHelper.hasCompactionStarted()).thenReturn(false).thenReturn(true);
corfuStoreCompactorMain.checkpoint(distributedCheckpointer);

verify(distributedCheckpointerHelper, times(2)).hasCompactionStarted();
verify(distributedCheckpointer).shutdown();
}
}

0 comments on commit 11e7987

Please sign in to comment.