Skip to content

Commit

Permalink
Mark compaction cycle status through finishCompactionCycle() only (#3461
Browse files Browse the repository at this point in the history
)

* Mark compaction cycle status through finishCompactionCycle() only

* Include instant_trigger key on failure test
  • Loading branch information
SravanthiAshokKumar committed Dec 20, 2022
1 parent 12d857e commit b6f816c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public void validateLiveness() {
for (TableName table : activeCheckpointTables) {
if (!livenessValidator.isTableCheckpointActive(table, Duration.ofMillis(currentTime)) &&
checkFailureAndFinishCompactionCycle(table)) {
log.info("Invoking finishCompactionCycle");
finishCompactionCycle();
break;
}
}
Expand All @@ -165,21 +167,9 @@ private boolean checkFailureAndFinishCompactionCycle(TableName table) {
txn.putRecord(compactorMetadataTables.getCheckpointingStatusTable(), table,
buildCheckpointStatus(StatusType.FAILED, tableStatus.getCycleCount()), null);
txn.delete(CompactorMetadataTables.ACTIVE_CHECKPOINTS_TABLE_NAME, table);

CheckpointingStatus managerStatus = (CheckpointingStatus) txn.getRecord(
CompactorMetadataTables.COMPACTION_MANAGER_TABLE_NAME,
CompactorMetadataTables.COMPACTION_MANAGER_KEY).getPayload();
txn.putRecord(compactorMetadataTables.getCompactionManagerTable(), CompactorMetadataTables.COMPACTION_MANAGER_KEY,
buildCheckpointStatus(
StatusType.FAILED,
managerStatus.getTableSize(),
System.currentTimeMillis() - managerStatus.getTimeTaken(),
managerStatus.getCycleCount()),
null);
txn.commit();
log.warn("Finished compaction cycle. FAILED due to no checkpoint activity of table: {}${}",
log.warn("Marked table {}${} FAILED due to no checkpoint activity",
table.getNamespace(), table.getTableName());
livenessValidator.clearLivenessMap();
return true;
} else {
txn.delete(CompactorMetadataTables.ACTIVE_CHECKPOINTS_TABLE_NAME, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private boolean tryLockTableToCheckpoint(@NonNull CompactorMetadataTables compac
break;
} catch (TransactionAbortedException e) {
if (e.getAbortCause() == AbortCause.CONFLICT) {
log.info("My opened table {}${} is being checkpointed by someone else",
log.info("Table {}${} is being checkpointed by someone else",
tableName.getNamespace(), tableName.getTableName());
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ private boolean pollForFinishCheckpointing() {
if (managerStatus != null && (managerStatus.getStatus() == StatusType.COMPLETED
|| managerStatus.getStatus() == StatusType.FAILED)) {
log.info("done pollForFinishCp: {}", managerStatus.getStatus());
System.out.println("done pollForFinishCp: " + managerStatus.getStatus());
return true;
}
}
Expand Down Expand Up @@ -513,6 +512,15 @@ public void checkpointFailureTest() throws Exception {
when(dynamicTriggerPolicy0.shouldTrigger(Matchers.anyLong(), Matchers.any())).thenReturn(true).thenReturn(false);
compactorService1.start(Duration.ofMillis(COMPACTOR_SERVICE_INTERVAL));

Table<StringKey, RpcCommon.TokenMsg, Message> checkpointTable = openCompactionControlsTable();
try (TxnContext txn = corfuStore.txn(CORFU_SYSTEM_NAMESPACE)) {
txn.putRecord(checkpointTable,
CompactorMetadataTables.INSTANT_TIGGER,
RpcCommon.TokenMsg.newBuilder().setSequence(System.currentTimeMillis()).build(),
null);
txn.commit();
}

try {
TimeUnit.MILLISECONDS.sleep(LIVENESS_TIMEOUT.toMillis() / 2);
Table<TableName, ActiveCPStreamMsg, Message> activeCheckpointTable = openActiveCheckpointsTable();
Expand All @@ -539,6 +547,7 @@ public void checkpointFailureTest() throws Exception {

assert verifyManagerStatus(StatusType.FAILED);
assert verifyCheckpointStatusTable(StatusType.COMPLETED, 1);
assert verifyCompactionControlsTable(CompactorMetadataTables.INSTANT_TIGGER) == 0;
}

@Test
Expand Down Expand Up @@ -615,7 +624,7 @@ public void instantTriggerUpgradeTest() {
try (TxnContext txn = corfuStore.txn(CORFU_SYSTEM_NAMESPACE)) {
txn.putRecord(checkpointTable,
CompactorMetadataTables.INSTANT_TIGGER_WITH_TRIM,
RpcCommon.TokenMsg.getDefaultInstance(),
RpcCommon.TokenMsg.newBuilder().setSequence(System.currentTimeMillis()).build(),
null);
txn.commit();
}
Expand Down

0 comments on commit b6f816c

Please sign in to comment.