Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private void buildActionMap() {
(req, client) -> client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
(req, client) -> client.generateDataPartitionTableHeartbeat());
(req, client) ->
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req));
actionMap = actionMapBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,15 +443,15 @@ protected void setLoadManager() {
}

public void close() throws IOException {
if (consensusManager.get() != null) {
consensusManager.get().close();
}
if (partitionManager != null) {
partitionManager.getRegionMaintainer().shutdown();
}
if (procedureManager != null) {
procedureManager.stopExecutor();
}
if (consensusManager.get() != null) {
consensusManager.get().close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeHandler;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
Expand Down Expand Up @@ -2329,6 +2330,25 @@ public Pair<Long, Boolean> checkDuplicateTableTask(
return new Pair<>(-1L, false);
}

public boolean isExistUnfinishedProcedure(
Class<? extends StateMachineProcedure<?, ?>> procedureClass) {
if (procedureClass == null) {
return false;
}

for (Procedure<ConfigNodeProcedureEnv> procedure : getExecutor().getProcedures().values()) {
if (!procedure.isFinished() && procedureClass.isInstance(procedure)) {
LOGGER.info(
"[{}] procedure details are {}",
procedureClass.getSimpleName(),
procedure.toStringDetails());
return true;
}
}

return false;
}

// ======================================================
/*
GET-SET Region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,25 @@ public TConfigNodeLocation getLeaderLocation() {
return null;
}

public TConfigNodeLocation getNotNullLeaderLocation() {
Peer leaderPeer = getLeaderPeer();

while (leaderPeer == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {

}
leaderPeer = getLeaderPeer();
}

Peer finalLeaderPeer = leaderPeer;
return getNodeManager().getRegisteredConfigNodes().stream()
.filter(leader -> leader.getConfigNodeId() == finalLeaderPeer.getNodeId())
.findFirst()
.orElse(null);
}

/**
* @return true if ConfigNode-leader is elected, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,13 @@ private Flow requestPartitionTablesHeartBeat() {

if (!dataPartitionTables.containsKey(dataNodeId)) {
try {
TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq();
req.setDatabases(databasesWithLostDataPartition);
Object response =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNode.getLocation().getInternalEndPoint(),
null,
req,
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
MAX_RETRY_COUNT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
Expand Down Expand Up @@ -195,6 +196,9 @@ public void active() {
configManager.initConsensusManager();
upgrade();
TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
if (leaderNodeLocation == null) {
leaderNodeLocation = configManager.getConsensusManager().getNotNullLeaderLocation();
}
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until ConfigNode is fully available
Expand Down Expand Up @@ -225,36 +229,42 @@ public void active() {

/* After the ConfigNode leader election, a leader switch may occur, which could cause the procedure not to be created. This can happen if the original leader has not yet executed the procedure creation, while the other followers have already finished starting up. Therefore, having the original leader (before the leader switch) initiate the process ensures that only one procedure will be created. */
if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
dataPartitionTableCheckFuture =
dataPartitionTableCheckExecutor.submit(
() -> {
LOGGER.info(
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up");
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());

while (true) {
List<Integer> dnList =
configManager
.getLoadManager()
.filterDataNodeThroughStatus(NodeStatus.Running);
if (dnList != null && !dnList.isEmpty()) {
LOGGER.info("Starting dataPartitionTableIntegrityCheck...");
TSStatus status =
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"Data partition table integrity check failed! Current status code is {}, status message is {}",
status.getCode(),
status.getMessage());
if (!configManager
.getProcedureManager()
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)) {
dataPartitionTableCheckFuture =
dataPartitionTableCheckExecutor.submit(
() -> {
LOGGER.info(
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up");
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());

while (true) {
List<Integer> dnList =
configManager
.getLoadManager()
.filterDataNodeThroughStatus(NodeStatus.Running);
if (dnList != null && !dnList.isEmpty()) {
LOGGER.info("Starting dataPartitionTableIntegrityCheck...");
TSStatus status =
configManager
.getProcedureManager()
.dataPartitionTableIntegrityCheck();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"Data partition table integrity check failed! Current status code is {}, status message is {}",
status.getCode(),
status.getMessage());
}
break;
} else {
LOGGER.info("No running datanodes found, waiting...");
Thread.sleep(5000);
}
break;
} else {
LOGGER.info("No running datanodes found, waiting...");
Thread.sleep(5000);
}
}
return null;
});
return null;
});
}
}
return;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3228,7 +3228,8 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable(
}

@Override
public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() {
public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(
TGenerateDataPartitionTableReq req) {
TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp();
// Must be lower than the RPC request timeout, in milliseconds
final long timeoutMs = 50000;
Expand All @@ -3238,10 +3239,13 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb
// To resolve this situation that the DataNode is registered and didn't request
// generateDataPartitionTable interface yet.
if (currentGeneratorFuture == null || currentGenerator == null) {
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setMessage("No DataPartitionTable generation task found");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
return resp;
generateDataPartitionTable(req);
if (currentGeneratorFuture == null || currentGenerator == null) {
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setMessage("No DataPartitionTable generation task found");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
return resp;
}
}

currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ service IDataNodeRPCService {
/**
* Check the status of DataPartitionTable generation task
*/
TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat()
TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req)

/**
* END: Data Partition Table Integrity Check
Expand Down
Loading