Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class PipeLeaderChangeHandler implements IClusterStatusSubscriber {

Expand Down Expand Up @@ -85,7 +86,9 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
// Pipe only collect user's data, filter metric database here.
// DatabaseName may be null for config region group
if (!SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) {
if (Objects.isNull(databaseName)
|| (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + "."))) {
// null or -1 means empty origin leader
final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId());
final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
if (databaseName != null
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
&& currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) {
// Pipe only collect user's data, filter metric database here.
updatedConsensusGroupIdToTaskMetaMap.put(
Expand All @@ -151,15 +152,16 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
// config region
updatedConsensusGroupIdToTaskMetaMap.put(
// 0 is the consensus group id of the config region, but data region id and schema region
// id
// also start from 0, so we use Integer.MIN_VALUE to represent the config region
// id also start from 0, so we use Integer.MIN_VALUE to represent the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
configRegionTaskMeta.getProgressIndex(),
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}

updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);

// If the pipe's previous status was user stopped, then after the alter operation, the pipe's
// status remains user stopped; otherwise, it becomes running.
if (!pipeTaskInfo.get().isPipeStoppedByUser(alterPipeRequest.getPipeName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
if (databaseName != null
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) {
// Pipe only collect user's data, filter out metric database here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
Expand Down