Skip to content

Commit

Permalink
Avoid opening compactor metadata tables on every shouldTrigger call (#…
Browse files Browse the repository at this point in the history
…3600)

When shouldTrigger initializes DistributedCheckpointerHelper on every call,
it opens all the compactor metadata tables leading to excessive openTable logs.
Hence, reuse distributedCheckpointerHelper from CompactorService
  • Loading branch information
SravanthiAshokKumar committed Jun 3, 2023
1 parent c791382 commit d3c93e6
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.corfudb.infrastructure;

import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;

public interface CompactionTriggerPolicy {
boolean shouldTrigger(long interval, CorfuStore corfuStore) throws Exception;
boolean shouldTrigger(long interval, CorfuStore corfuStore, DistributedCheckpointerHelper distributedCheckpointerHelper) throws Exception;

void markCompactionCycleStart();
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private void runOrchestrator() {
compactorLeaderServices.validateLiveness();
}
} else if (compactionTriggerPolicy.shouldTrigger(
this.corfuRuntimeParameters.getCheckpointTriggerFreqMillis(), getCorfuStore())) {
this.corfuRuntimeParameters.getCheckpointTriggerFreqMillis(), getCorfuStore(), getDistributedCheckpointerHelper())) {
trimLog.invokePrefixTrim(getCorfuRuntime(), getCorfuStore());
compactionTriggerPolicy.markCompactionCycleStart();
compactorLeaderServices.initCompactionCycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ private boolean shouldForceTrigger(CorfuStore corfuStore) {
* @return true if compaction cycle should run, false otherwise
*/
@Override
public boolean shouldTrigger(long interval, CorfuStore corfuStore) throws Exception {
DistributedCheckpointerHelper distributedCheckpointerHelper = new DistributedCheckpointerHelper(corfuStore);
public boolean shouldTrigger(long interval, CorfuStore corfuStore, DistributedCheckpointerHelper distributedCheckpointerHelper) {

if (distributedCheckpointerHelper.isCompactionDisabled()) {
log.warn("Compaction has been disabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void checkpointTables() {
for (TableName tableName : tableNames) {
boolean isSuccess = tryCheckpointTable(tableName, t -> getCheckpointWriter(t, keyDynamicProtobufSerializer));
if (!isSuccess) {
log.warn("Stop checkpointing after failure in {}${}", tableName.getNamespace(), tableName.getTableName());
log.warn("Stop checkpointing after failure");
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void singleServerTest() throws Exception {

CompactorService compactorService0 = spy(new CompactorService(sc0, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm0, dynamicTriggerPolicy0));
doReturn(runtime0).when(compactorService0).getNewCorfuRuntime();
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

try {
Expand All @@ -403,13 +403,13 @@ public void multipleServerTest() throws Exception {

CompactorService compactorService0 = spy(new CompactorService(sc0, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm0, dynamicTriggerPolicy0));
doReturn(runtime0).when(compactorService0).getNewCorfuRuntime();
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

DynamicTriggerPolicy dynamicTriggerPolicy1 = mock(DynamicTriggerPolicy.class);
CompactorService compactorService1 = spy(new CompactorService(sc1, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm1, dynamicTriggerPolicy1));
doReturn(runtime1).when(compactorService1).getNewCorfuRuntime();
when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService1.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

try {
Expand Down Expand Up @@ -440,13 +440,13 @@ public void leaderFailureTest() throws Exception {
CompactorService compactorService2 = spy(new CompactorService(sc2, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm2, dynamicTriggerPolicy2));
doReturn(runtime2).when(compactorService2).getNewCorfuRuntime();

when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService1.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

when(dynamicTriggerPolicy2.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy2.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService2.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

try {
Expand Down Expand Up @@ -480,13 +480,13 @@ public void nonLeaderFailureTest() throws Exception {
CompactorService compactorService2 = spy(new CompactorService(sc2, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm2, dynamicTriggerPolicy2));
doReturn(runtime2).when(compactorService2).getNewCorfuRuntime();

when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy1.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService1.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

when(dynamicTriggerPolicy2.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy2.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService2.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

try {
Expand All @@ -512,7 +512,7 @@ public void checkpointFailureTest() throws Exception {

CompactorService compactorService0 = spy(new CompactorService(sc0, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm0, dynamicTriggerPolicy0));
doReturn(runtime0).when(compactorService0).getNewCorfuRuntime();
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

Table<StringKey, RpcCommon.TokenMsg, Message> checkpointTable = openCompactionControlsTable();
Expand Down Expand Up @@ -558,7 +558,7 @@ public void runOrchestratorLeaderInitManagerStatusTest() throws Exception {
testSetup(logSizeLimitPercentageFull);
CompactorService compactorService0 = spy(new CompactorService(sc0, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm0, new DynamicTriggerPolicy()));
doReturn(runtime0).when(compactorService0).getNewCorfuRuntime();
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

try {
Expand Down Expand Up @@ -591,7 +591,7 @@ public void quotaExceededTest() throws Exception {
testSetup(logSizeLimitPercentageLow);
CompactorService compactorService0 = spy(new CompactorService(sc0, Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL), mockInvokeJvm0, dynamicTriggerPolicy0));
doReturn(runtime0).when(compactorService0).getNewCorfuRuntime();
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService0.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

// Write entries to the stream until the quota has been exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.corfudb.runtime.CorfuCompactorManagement.CheckpointingStatus.StatusType;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStoreEntry;
import org.corfudb.runtime.collections.TxnContext;
Expand All @@ -19,7 +20,6 @@
import org.corfudb.runtime.view.Layout;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;

import java.time.Duration;
import java.util.HashMap;
Expand All @@ -30,6 +30,7 @@
import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.after;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void runOrchestratorLeaderTest() throws Exception {
when((CheckpointingStatus) corfuStoreCompactionManagerEntry.getPayload())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.FAILED).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build());
when(dynamicTriggerPolicy.shouldTrigger(Matchers.anyLong(), Matchers.any(CorfuStore.class))).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy.shouldTrigger(anyLong(), any(CorfuStore.class), any(DistributedCheckpointerHelper.class))).thenReturn(true).thenReturn(false);
doNothing().when(leaderServices).validateLiveness();
doReturn(CompactorLeaderServices.LeaderInitStatus.SUCCESS).when(leaderServices).initCompactionCycle();
when(invokeCheckpointingJvm.isRunning()).thenReturn(false).thenReturn(true);
Expand All @@ -164,7 +165,7 @@ public void failOnAcquireManagerStatusTest() throws Exception {
when(txn.commit()).thenThrow(new TransactionAbortedException(
new TxResolutionInfo(UUID.randomUUID(), new Token(0, 0)),
AbortCause.CONFLICT, new Throwable(), null));
when(dynamicTriggerPolicy.shouldTrigger(Matchers.anyLong(), Matchers.any(CorfuStore.class))).thenReturn(true);
when(dynamicTriggerPolicy.shouldTrigger(anyLong(), any(CorfuStore.class), any(DistributedCheckpointerHelper.class))).thenReturn(true);
doReturn(CompactorLeaderServices.LeaderInitStatus.SUCCESS).when(leaderServices).initCompactionCycle();

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
Expand All @@ -185,7 +186,7 @@ public void runOrchestratorSchedulerTest() throws Exception {
when((CheckpointingStatus) corfuStoreCompactionManagerEntry.getPayload())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.FAILED).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build());
when(dynamicTriggerPolicy.shouldTrigger(Matchers.anyLong(), Matchers.any(CorfuStore.class))).thenReturn(true).thenReturn(false);
when(dynamicTriggerPolicy.shouldTrigger(anyLong(), any(CorfuStore.class), any(DistributedCheckpointerHelper.class))).thenReturn(true).thenReturn(false);
doNothing().when(leaderServices).validateLiveness();
doReturn(CompactorLeaderServices.LeaderInitStatus.SUCCESS).when(leaderServices).initCompactionCycle();
when(invokeCheckpointingJvm.isRunning())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.extern.slf4j.Slf4j;
import org.corfudb.runtime.CompactorMetadataTables;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStoreEntry;
import org.corfudb.runtime.collections.TxnContext;
Expand Down Expand Up @@ -47,35 +48,35 @@ public void testShouldTrigger() throws Exception {
when(corfuStoreEntry.getPayload()).thenReturn(null);

dynamicTriggerPolicy.markCompactionCycleStart();
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));

try {
TimeUnit.MILLISECONDS.sleep(INTERVAL * 2);
} catch (InterruptedException e) {
log.warn("Sleep interrupted: ", e);
}
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));
}

@Test
public void testShouldForceTrigger() throws Exception {
when((RpcCommon.TokenMsg) corfuStoreEntry.getPayload()).thenReturn(null)
.thenReturn(null)
.thenReturn(RpcCommon.TokenMsg.getDefaultInstance());
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));
}

@Test
public void testDisableCompaction() throws Exception {
when((RpcCommon.TokenMsg) corfuStoreEntry.getPayload()).thenReturn(RpcCommon.TokenMsg.getDefaultInstance()).thenReturn(null);
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));
}

@Test
public void testCheckpointFrozen() throws Exception {
when((RpcCommon.TokenMsg) corfuStoreEntry.getPayload()).thenReturn(null).thenReturn(RpcCommon.TokenMsg.newBuilder()
.setSequence(System.currentTimeMillis()).build());
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert !dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));
}

@Test
Expand All @@ -85,7 +86,7 @@ public void testCheckpointFrozenReturnFalse() throws Exception {
.thenReturn(null)
.thenReturn(RpcCommon.TokenMsg.newBuilder().setSequence(System.currentTimeMillis() - patience).build())
.thenReturn(RpcCommon.TokenMsg.newBuilder().setSequence(System.currentTimeMillis()).build());
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore);
assert dynamicTriggerPolicy.shouldTrigger(INTERVAL, corfuStore, new DistributedCheckpointerHelper(corfuStore));
verify(txn, times(1)).delete(CompactorMetadataTables.COMPACTION_CONTROLS_TABLE, CompactorMetadataTables.FREEZE_TOKEN);
}
}

0 comments on commit d3c93e6

Please sign in to comment.