Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
be7d02c
Reduce invalid Pipe startup
luoluoyuyu Nov 12, 2024
31c394c
fix
luoluoyuyu Nov 12, 2024
2e4ed46
update PipeHistoricalDataRegionTsFileAndDeletionExtractor
luoluoyuyu Nov 13, 2024
15c2971
update PipeTaskInfo
luoluoyuyu Nov 13, 2024
edfc5f2
fix
luoluoyuyu Nov 14, 2024
369b363
fix PipeHistoricalDataRegionTsFileAndDeletionExtractor
luoluoyuyu Nov 14, 2024
901bcbf
Merge branch 'apache:master' into reduce-invalid-pipe-startup
luoluoyuyu Nov 14, 2024
4c3a9bb
update PipeTaskInfo
luoluoyuyu Nov 14, 2024
42f4b81
fix
luoluoyuyu Nov 14, 2024
abd5bbb
fix
luoluoyuyu Nov 14, 2024
d0db06b
fix
luoluoyuyu Nov 14, 2024
75fa9f0
fix
luoluoyuyu Nov 14, 2024
04279d4
update PipeHistoricalDataRegionTsFileAndDeletionExtractor
luoluoyuyu Nov 14, 2024
a196113
update PipeTaskInfo
luoluoyuyu Nov 14, 2024
dc11f02
fix PipeTaskInfo
luoluoyuyu Nov 15, 2024
d7bfcb6
fix ci
luoluoyuyu Nov 15, 2024
97807f0
fix ci
luoluoyuyu Nov 18, 2024
a307e93
fix ci
luoluoyuyu Nov 18, 2024
1a6b360
Revert "fix ci"
luoluoyuyu Nov 19, 2024
53d0957
Revert "fix ci"
luoluoyuyu Nov 19, 2024
7f6896e
Ignore test
luoluoyuyu Nov 19, 2024
47fc358
Reduce unnecessary disassembly of TSFile by PipeRealtimeDataRegionExt…
luoluoyuyu Nov 20, 2024
e5cb5e4
update TablePattern
luoluoyuyu Nov 20, 2024
29843bf
Revert "update TablePattern"
luoluoyuyu Nov 20, 2024
0ef3b41
Revert "Reduce unnecessary disassembly of TSFile by PipeRealtimeDataR…
luoluoyuyu Nov 20, 2024
b25c9ce
Revert "fix ci"
luoluoyuyu Nov 21, 2024
b9b3c51
Revert "Ignore test"
luoluoyuyu Nov 21, 2024
b6bebcf
Merge branch 'master' of https://github.com/apache/iotdb into pr/14059
SteveYurongSu Nov 25, 2024
fd6529e
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Nov 25, 2024
dc6de5d
Update PipeTaskInfo.java
SteveYurongSu Nov 25, 2024
b3428ab
refactor
SteveYurongSu Nov 25, 2024
e77d4ba
refactor
SteveYurongSu Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.confignode.procedure.impl.pipe;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
Expand All @@ -30,6 +32,9 @@
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -390,9 +395,8 @@ protected Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProc
throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}

Expand All @@ -407,10 +411,9 @@ protected Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProc
public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
ConfigNodeProcedureEnv env, AtomicReference<PipeTaskInfo> pipeTaskInfo) throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}

Expand Down Expand Up @@ -487,7 +490,9 @@ protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env
protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
String pipeName, ConfigNodeProcedureEnv env) throws IOException {
return env.pushSinglePipeMetaToDataNodes(
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
copyAndFilterOutNonWorkingDataRegionPipeTasks(
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName))
.serialize());
}

/**
Expand All @@ -502,6 +507,68 @@ protected Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(
return env.dropSinglePipeOnDataNodes(pipeName);
}

public static PipeMeta copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta originalPipeMeta)
throws IOException {
final PipeMeta copiedPipeMeta = originalPipeMeta.deepCopy4TaskAgent();

copiedPipeMeta
.getRuntimeMeta()
.getConsensusGroupId2TaskMetaMap()
.entrySet()
.removeIf(
consensusGroupId2TaskMeta -> {
final String database;
try {
database =
ConfigNode.getInstance()
.getConfigManager()
.getPartitionManager()
.getRegionDatabase(
new TConsensusGroupId(
// We assume that the consensus group id is a data region id.
TConsensusGroupType.DataRegion,
consensusGroupId2TaskMeta.getKey()));
if (database == null) {
// If the consensus group id is not a data region id, we keep it.
// If the consensus group id is a data region id, but the database is not found,
// we keep it.
return false;
}
} catch (final Exception ignore) {
// In case of any exception, we keep the consensus group id.
return false;
}

final boolean isTableModel;
try {
final TDatabaseSchema schema =
ConfigNode.getInstance()
.getConfigManager()
.getClusterSchemaManager()
.getDatabaseSchemaByName(database);
if (schema == null) {
// If the database is not found, we keep it.
return false;
}
isTableModel = schema.isIsTableModel();
} catch (final Exception ignore) {
// If the database is not found, we keep it.
return false;
}

try {
return !DataRegionListeningFilter.shouldDatabaseBeListened(
copiedPipeMeta.getStaticMeta().getExtractorParameters(),
isTableModel,
database);
} catch (final Exception e) {
return false;
}
});

return copiedPipeMeta;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks;

public abstract class AbstractOperateSubscriptionAndPipeProcedure
extends AbstractOperateSubscriptionProcedure {
private static final Logger LOGGER =
Expand Down Expand Up @@ -135,7 +137,7 @@ protected Map<Integer, TPushPipeMetaResp> pushMultiPipeMetaToDataNodes(
LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its meta.", pipeName);
continue;
}
pipeMetaBinaryList.add(pipeMeta.serialize());
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushMultiPipeMetaToDataNodes(pipeMetaBinaryList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,34 @@ public class DataRegionListeningFilter {
}
}

public static boolean shouldDatabaseBeListened(
final PipeParameters parameters, final boolean isTableModel, final String databaseRawName)
throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
parseInsertionDeletionListeningOptionPair(parameters);
final boolean hasSpecificListeningOption =
insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
if (!hasSpecificListeningOption) {
return false;
}

if (isTableModel) {
final String databaseTableModel =
databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName;
final TablePattern tablePattern =
TablePattern.parsePipePatternFromSourceParameters(parameters);
return tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.matchesDatabase(databaseTableModel);
} else {
final String databaseTreeModel =
databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName;
final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
return treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDb(databaseTreeModel);
}
}

public static boolean shouldDataRegionBeListened(
PipeParameters parameters, DataRegionId dataRegionId) throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor

private TreePattern treePattern;
private TablePattern tablePattern;
private boolean isDbNameCoveredByPattern = false;

private boolean isModelDetected = false;
private boolean isTableModel;
private boolean isDbNameCoveredByPattern = false;

private boolean isHistoricalExtractorEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time
Expand All @@ -153,8 +155,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor

private volatile boolean hasBeenStarted = false;

private final Map<TsFileResource, Boolean> tsfile2IsTableModelMap = new HashMap<>(0);

private Queue<PersistentResource> pendingQueue;

@Override
Expand Down Expand Up @@ -669,49 +669,34 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso
return deviceSet.stream()
.anyMatch(
deviceID -> {
if (deviceID instanceof PlainDeviceID
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
|| deviceID.getTableName().equals(PATH_ROOT)) {
// In case of tree model deviceID
updateIsDbNameCoveredByPattern(resource, false);
if (treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDevice(deviceID)) {
tsfile2IsTableModelMap.computeIfAbsent(
resource, (tsFileResource) -> Boolean.FALSE);
return true;
}
} else {
// In case of table model deviceID
updateIsDbNameCoveredByPattern(resource, true);
if (tablePattern.isTableModelDataAllowedToBeCaptured()
// The database name in resource is prefixed with "root."
&& tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
&& tablePattern.matchesTable(deviceID.getTableName())) {
tsfile2IsTableModelMap.computeIfAbsent(
resource, (tsFileResource) -> Boolean.TRUE);
return true;
}
if (!isModelDetected) {
detectModel(resource, deviceID);
isModelDetected = true;
}
return false;

return isTableModel
? (tablePattern.isTableModelDataAllowedToBeCaptured()
// The database name in resource is prefixed with "root."
&& tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
&& tablePattern.matchesTable(deviceID.getTableName()))
: (treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDevice(deviceID));
});
}

private void updateIsDbNameCoveredByPattern(
final TsFileResource resource, final boolean isTableModel) {
if (isModelDetected) {
return;
}
private void detectModel(final TsFileResource resource, final IDeviceID deviceID) {
this.isTableModel =
!(deviceID instanceof PlainDeviceID
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
|| deviceID.getTableName().equals(PATH_ROOT));

final String databaseName = resource.getDatabaseName();
if (Objects.nonNull(databaseName)) {
isDbNameCoveredByPattern =
isTableModel
? tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.coversDb(databaseName.substring(5))
: treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.coversDb(databaseName);
isModelDetected = true;
}
isDbNameCoveredByPattern =
isTableModel
? tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.coversDb(databaseName.substring(5))
: treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.coversDb(databaseName);
}

private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) {
Expand Down Expand Up @@ -808,7 +793,7 @@ private Event supplyTerminateEvent() {
private Event supplyTsFileEvent(TsFileResource resource) {
final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
tsfile2IsTableModelMap.remove(resource),
isModelDetected ? isTableModel : null,
resource.getDatabaseName(),
resource,
shouldTransferModFile,
Expand Down Expand Up @@ -909,8 +894,6 @@ public int getPendingQueueSize() {

@Override
public synchronized void close() {
tsfile2IsTableModelMap.clear();

if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {
Expand Down