Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class DataPartitionTableIntegrityCheckProcedure
// how long to check all datanode are alive, the unit is ms
private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000;

// how long to roll back the next state, the unit is ms
private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000;

NodeManager dataNodeManager;
private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();

Expand Down Expand Up @@ -276,7 +279,8 @@ private Flow collectEarliestTimeslots() {
}

if (failedDataNodes.size() == allDataNodes.size()) {
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
delayRollbackNextState(
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
} else {
setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS);
}
Expand Down Expand Up @@ -439,7 +443,8 @@ private Flow requestPartitionTables() {
}

if (failedDataNodes.size() == allDataNodes.size()) {
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
delayRollbackNextState(
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
}

Expand Down Expand Up @@ -524,7 +529,8 @@ private Flow requestPartitionTablesHeartBeat() {
// Don't find any one data partition table generation task on all registered DataNodes, go back
// to the REQUEST_PARTITION_TABLES step and re-execute
if (failedDataNodes.size() == allDataNodes.size()) {
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
delayRollbackNextState(
DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
return Flow.HAS_MORE_STATE;
}

Expand Down Expand Up @@ -554,7 +560,8 @@ private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) {
if (dataPartitionTables.isEmpty()) {
LOG.error(
"[DataPartitionIntegrity] No DataPartitionTables to merge, dataPartitionTables is empty");
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
delayRollbackNextState(
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
}

Expand Down Expand Up @@ -675,14 +682,25 @@ private Flow getFlow() {
if (!failedDataNodes.isEmpty()) {
allDataNodes.removeAll(failedDataNodes);
skipDataNodes = new HashSet<>(allDataNodes);
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
delayRollbackNextState(
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
} else {
skipDataNodes.clear();
return Flow.NO_MORE_STATE;
}
}

/** Delay to jump to next state, avoid write raft logs frequently when exception occur */
private void delayRollbackNextState(DataPartitionTableIntegrityCheckProcedureState state) {
sleep(
ROLL_BACK_NEXT_STATE_INTERVAL,
String.format(
"[DataPartitionIntegrity] Error waiting for roll back the %s state due to thread interruption.",
state));
setNextState(state);
}

@Override
public void serialize(final DataOutputStream stream) throws IOException {
super.serialize(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3377,6 +3377,9 @@ private void processDataRegionForEarliestTimeslots(Map<String, Long> earliestTim
}

Set<Long> timePartitionIds = tsFileManager.getTimePartitions();
if (timePartitionIds.isEmpty()) {
return;
}
final long earliestTimeSlotId = Collections.min(timePartitionIds);
earliestTimeslots.compute(
databaseName,
Expand Down
Loading