diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index 0ff1ec91acdb..5f67355b0cbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -90,6 +90,9 @@ public class DataPartitionTableIntegrityCheckProcedure // how long to check all datanode are alive, the unit is ms private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000; + // how long to roll back the next state, the unit is ms + private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000; + NodeManager dataNodeManager; private List allDataNodes = new ArrayList<>(); @@ -276,7 +279,8 @@ private Flow collectEarliestTimeslots() { } if (failedDataNodes.size() == allDataNodes.size()) { - setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + delayRollbackNextState( + DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); } else { setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); } @@ -439,7 +443,8 @@ private Flow requestPartitionTables() { } if (failedDataNodes.size() == allDataNodes.size()) { - setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + delayRollbackNextState( + DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); return Flow.HAS_MORE_STATE; } @@ -524,7 +529,8 @@ private Flow requestPartitionTablesHeartBeat() { // Don't find any one data partition table generation task on all registered DataNodes, go back // to the REQUEST_PARTITION_TABLES step and re-execute if (failedDataNodes.size() == allDataNodes.size()) { - setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + delayRollbackNextState( + DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); return Flow.HAS_MORE_STATE; } @@ -554,7 +560,8 @@ private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) { if (dataPartitionTables.isEmpty()) { LOG.error( "[DataPartitionIntegrity] No DataPartitionTables to merge, dataPartitionTables is empty"); - setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + delayRollbackNextState( + DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); return Flow.HAS_MORE_STATE; } @@ -675,7 +682,8 @@ private Flow getFlow() { if (!failedDataNodes.isEmpty()) { allDataNodes.removeAll(failedDataNodes); skipDataNodes = new HashSet<>(allDataNodes); - setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + delayRollbackNextState( + DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); return Flow.HAS_MORE_STATE; } else { skipDataNodes.clear(); @@ -683,6 +691,16 @@ private Flow getFlow() { } } + /** Delay to jump to next state, avoid write raft logs frequently when exception occur */ + private void delayRollbackNextState(DataPartitionTableIntegrityCheckProcedureState state) { + sleep( + ROLL_BACK_NEXT_STATE_INTERVAL, + String.format( + "[DataPartitionIntegrity] Error waiting for roll back the %s state due to thread interruption.", + state)); + setNextState(state); + } + @Override public void serialize(final DataOutputStream stream) throws IOException { super.serialize(stream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e2bfe4f6ad9c..3aa7aad7e4fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -3377,6 +3377,9 @@ private void processDataRegionForEarliestTimeslots(Map earliestTim } Set timePartitionIds = tsFileManager.getTimePartitions(); + if (timePartitionIds.isEmpty()) { + return; + } final long earliestTimeSlotId = Collections.min(timePartitionIds); earliestTimeslots.compute( databaseName,