Skip to content

Commit

Permalink
Add logging fixes for compactor (#3682)
Browse files Browse the repository at this point in the history
1. On OutOfMemory or other errors, check_call method logs to the shell.
Capture those logs in the compactor audit logfile.
2. When the leader marks the checkpoint status of a table as FAILED, retain
the client that was responsible for checkpointing the table.
  • Loading branch information
SravanthiAshokKumar committed Jun 26, 2023
1 parent 513cdbe commit 9b29e16
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public LeaderInitStatus initCompactionCycle() {

long newCycleCount = managerStatus == null ? 0 : managerStatus.getCycleCount() + 1;
List<TableName> tableNames = new ArrayList<>(corfuStore.listTables(null));
CheckpointingStatus idleStatus = buildCheckpointStatus(StatusType.IDLE, newCycleCount);
CheckpointingStatus idleStatus = buildCheckpointStatus(StatusType.IDLE, nodeEndpoint, newCycleCount);

txn.clear(CompactorMetadataTables.CHECKPOINT_STATUS_TABLE_NAME);
txn.clear(CompactorMetadataTables.ACTIVE_CHECKPOINTS_TABLE_NAME);
Expand Down Expand Up @@ -169,7 +169,7 @@ private boolean checkFailureAndFinishCompactionCycle(TableName table) {

if (tableStatus.getStatus() != StatusType.COMPLETED && tableStatus.getStatus() != StatusType.FAILED) {
txn.putRecord(compactorMetadataTables.getCheckpointingStatusTable(), table,
buildCheckpointStatus(StatusType.FAILED, tableStatus.getCycleCount()), null);
buildCheckpointStatus(StatusType.FAILED, tableStatus.getClientName(), tableStatus.getCycleCount()), null);
txn.delete(CompactorMetadataTables.ACTIVE_CHECKPOINTS_TABLE_NAME, table);
txn.commit();
log.warn("Marked table {}${} FAILED due to no checkpoint activity",
Expand Down Expand Up @@ -287,10 +287,10 @@ private void deleteInstantKeyIfPresent() {
}
}

private CheckpointingStatus buildCheckpointStatus(CheckpointingStatus.StatusType statusType, long compactorCycleCount) {
private CheckpointingStatus buildCheckpointStatus(CheckpointingStatus.StatusType statusType, String clientName, long compactorCycleCount) {
return CheckpointingStatus.newBuilder()
.setStatus(statusType)
.setClientName(nodeEndpoint)
.setClientName(clientName)
.setCycleCount(compactorCycleCount)
.build();
}
Expand Down
3 changes: 2 additions & 1 deletion scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ def _run_corfu_compactor(self):
return
cmd = self._command_builder.get_corfu_compactor_cmd(compactor_config, class_to_invoke)
self._print_and_log("Start compacting. Command %s" % cmd)
check_call(cmd, shell=True)
with open(corfu_paths["CompactorLogfile"], 'a') as f:
check_call(cmd, stdout=f, stderr=f, shell=True)
self._print_and_log("Finished running corfu compactor tool.")

except Exception as ex:
Expand Down

0 comments on commit 9b29e16

Please sign in to comment.