Skip to content

Commit

Permalink
Retry on TXnAbortedException during unlockTableAfterCheckpoint (#3473)
Browse files Browse the repository at this point in the history
This exception could be caused on txn.delete(activeCheckpointTable) as well.
For this case, we need to retry. In case of TAE due to updating checkpoint status
of the current table, retrying still won't cause an issue since we have the method
protected with compactorCycleCount, managerStatus and the tableStatus.
  • Loading branch information
SravanthiAshokKumar committed Jan 4, 2023
1 parent e3a9c5a commit dcab1d4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private boolean unlockTableAfterCheckpoint(@NonNull CompactorMetadataTables comp
CheckpointingStatus tableStatus = (CheckpointingStatus) txn.getRecord(
CompactorMetadataTables.CHECKPOINT_STATUS_TABLE_NAME, tableName).getPayload();
if (tableStatus.getStatus() == StatusType.FAILED) {
//Leader marked me as failed
log.error("Table status for {}${} has already been marked as FAILED",
tableName.getNamespace(), tableName.getTableName());
txn.commit();
Expand All @@ -120,7 +121,6 @@ private boolean unlockTableAfterCheckpoint(@NonNull CompactorMetadataTables comp
} catch (TransactionAbortedException e) {
log.error("TransactionAbortedException exception while trying to unlock table {}${}: {}",
tableName.getNamespace(), tableName.getTableName(), e.getMessage());
break; //Leader marked me as failed
} catch (RuntimeException re) {
if (isCriticalRuntimeException(re, retry, MAX_RETRIES)) {
throw new IllegalStateException(re);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ public void unlockTableAfterCheckpointTest() {
//Fail on different cycleCount values
assert !distributedCheckpointer.tryCheckpointTable(tableName, t -> cpw);

when((CheckpointingStatus) corfuStoreEntry.getPayload())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.IDLE).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build());
when(txn.commit()).thenReturn(Timestamp.getDefaultInstance()) //commit in tryLockTableToCheckpoint()
.thenThrow(new TransactionAbortedException(
new TxResolutionInfo(UUID.randomUUID(), new Token(0, 0)),
AbortCause.CONFLICT, new Throwable(), null))
.thenReturn(Timestamp.getDefaultInstance());
assert distributedCheckpointer.tryCheckpointTable(tableName, t -> cpw);

when((CheckpointingStatus) corfuStoreEntry.getPayload())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.IDLE).build())
Expand Down

0 comments on commit dcab1d4

Please sign in to comment.