Skip to content

Commit

Permalink
Fix runOrchestrator when managerStatus is null (#3478)
Browse files Browse the repository at this point in the history
* Fix runOrchestrator when managerStatus is null

Return on cases when managerStatus is unable to be acquires as the
runOrchestrator method runs every 10s

* Throw TxnAbortedException in the UT to make the test more realistic
  • Loading branch information
SravanthiAshokKumar committed Jan 17, 2023
1 parent 465d3c2 commit 9fddfea
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ private void runOrchestrator() {
}
txn.commit();
} catch (Exception e) {
log.warn("Unable to acquire manager status: ", e);
log.error("Unable to acquire manager status: ", e);
return;
}
try {
if (isLeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.protobuf.Message;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.wireprotocol.Token;
import org.corfudb.protocols.wireprotocol.TxResolutionInfo;
import org.corfudb.runtime.CompactorMetadataTables;
import org.corfudb.runtime.CorfuCompactorManagement.CheckpointingStatus;
import org.corfudb.runtime.CorfuCompactorManagement.CheckpointingStatus.StatusType;
Expand All @@ -10,6 +12,8 @@
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStoreEntry;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.exceptions.AbortCause;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
import org.corfudb.runtime.proto.RpcCommon;
import org.corfudb.runtime.view.Layout;
Expand All @@ -21,10 +25,12 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -72,6 +78,7 @@ public void setup() {
when(corfuStore.txn(CORFU_SYSTEM_NAMESPACE)).thenReturn(txn);
when(txn.getRecord(CompactorMetadataTables.COMPACTION_MANAGER_TABLE_NAME, CompactorMetadataTables.COMPACTION_MANAGER_KEY)).thenReturn(corfuStoreCompactionManagerEntry);
when(txn.getRecord(CompactorMetadataTables.COMPACTION_CONTROLS_TABLE, CompactorMetadataTables.DISABLE_COMPACTION)).thenReturn(corfuStoreCompactionControlsEntry);
doNothing().when(txn).putRecord(any(), any(), any(), any());
when(txn.commit()).thenReturn(CorfuStoreMetadata.Timestamp.getDefaultInstance());

CompactorService compactorService = new CompactorService(serverContext,
Expand Down Expand Up @@ -137,6 +144,30 @@ public void runOrchestratorLeaderTest() throws Exception {
verify(invokeCheckpointingJvm, times(1)).shutdown();
}

@Test
public void failOnAcquireManagerStatusTest() throws Exception {
Layout mockLayout = mock(Layout.class);
when(corfuRuntime.invalidateLayout()).thenReturn(CompletableFuture.completedFuture(mockLayout));
when(mockLayout.getPrimarySequencer()).thenReturn(NODE_ENDPOINT);

when((CheckpointingStatus) corfuStoreCompactionManagerEntry.getPayload())
.thenReturn(null);
when(txn.commit()).thenThrow(new TransactionAbortedException(
new TxResolutionInfo(UUID.randomUUID(), new Token(0, 0)),
AbortCause.CONFLICT, new Throwable(), null));
when(dynamicTriggerPolicy.shouldTrigger(Matchers.anyLong(), Matchers.any(CorfuStore.class))).thenReturn(true);
doReturn(CompactorLeaderServices.LeaderInitStatus.SUCCESS).when(leaderServices).initCompactionCycle();

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(leaderServices, never()).initCompactionCycle();
}

@Test
public void runOrchestratorSchedulerTest() throws Exception {
Layout mockLayout = mock(Layout.class);
Expand Down

0 comments on commit 9fddfea

Please sign in to comment.