Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipePatternFormatIT extends AbstractPipeDualAutoIT {
public class IoTDBTreePatternFormatIT extends AbstractPipeDualAutoIT {
@Test
public void testPrefixPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private void doTransfer(
socket,
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getPatternString(),
pipeConfigRegionSnapshotEvent.getTreePatternString(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
// The pattern is surely Non-null
snapshotEvent.getPatternString(),
snapshotEvent.getTreePatternString(),
snapshotFile.getName(),
snapshotFile.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package org.apache.iotdb.confignode.manager.pipe.event;

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
Expand Down Expand Up @@ -90,7 +91,7 @@ public PipeConfigRegionSnapshotEvent() {

public PipeConfigRegionSnapshotEvent(
final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
this(snapshotPath, templateFilePath, type, null, 0, null, null);
this(snapshotPath, templateFilePath, type, null, 0, null, null, null);
}

public PipeConfigRegionSnapshotEvent(
Expand All @@ -100,8 +101,15 @@ public PipeConfigRegionSnapshotEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
super(pipeName, creationTime, pipeTaskMeta, pattern, PipeConfigNodeResourceManager.snapshot());
final TreePattern treePattern,
final TablePattern tablePattern) {
super(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
PipeConfigNodeResourceManager.snapshot());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : "";
this.fileType = type;
Expand Down Expand Up @@ -160,11 +168,19 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final TreePattern treePattern,
final TablePattern tablePattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionSnapshotEvent(
snapshotPath, templateFilePath, fileType, pipeName, creationTime, pipeTaskMeta, pattern);
snapshotPath,
templateFilePath,
fileType,
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package org.apache.iotdb.confignode.manager.pipe.event;

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
Expand All @@ -41,17 +42,18 @@ public PipeConfigRegionWritePlanEvent() {

public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
this(configPhysicalPlan, null, 0, null, null, null, isGeneratedByPipe);
}

public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final TreePattern treePattern,
final TablePattern tablePattern,
final boolean isGeneratedByPipe) {
super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
}

Expand All @@ -64,11 +66,12 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final TreePattern treePattern,
final TablePattern tablePattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionWritePlanEvent(
configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, false);
configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanVisitor;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
Expand Down Expand Up @@ -56,12 +56,12 @@

/**
* The {@link PipeConfigPhysicalPlanPatternParseVisitor} will transform the schema {@link
* ConfigPhysicalPlan}s using {@link IoTDBPipePattern}. Rule:
* ConfigPhysicalPlan}s using {@link IoTDBTreePattern}. Rule:
*
* <p>1. All patterns in the output {@link ConfigPhysicalPlan} will be the intersection of the
* original {@link ConfigPhysicalPlan}'s patterns and the given {@link IoTDBPipePattern}.
* original {@link ConfigPhysicalPlan}'s patterns and the given {@link IoTDBTreePattern}.
*
* <p>2. If a pattern does not intersect with the {@link IoTDBPipePattern}, it's dropped.
* <p>2. If a pattern does not intersect with the {@link IoTDBTreePattern}, it's dropped.
*
* <p>3. If all the patterns in the {@link ConfigPhysicalPlan} is dropped, the {@link
* ConfigPhysicalPlan} is dropped.
Expand All @@ -70,13 +70,13 @@
* one is used in the {@link PipeConfigRegionWritePlanEvent} in {@link ConfigRegionListeningQueue}.
*/
public class PipeConfigPhysicalPlanPatternParseVisitor
extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, IoTDBPipePattern> {
extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, IoTDBTreePattern> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfigPhysicalPlanPatternParseVisitor.class);

@Override
public Optional<ConfigPhysicalPlan> visitPlan(
final ConfigPhysicalPlan plan, final IoTDBPipePattern pattern) {
final ConfigPhysicalPlan plan, final IoTDBTreePattern pattern) {
return Optional.of(plan);
}

Expand All @@ -88,31 +88,31 @@ public Optional<ConfigPhysicalPlan> visitPlan(
// Other matches using "matchPrefixPath" are with the same principle.
@Override
public Optional<ConfigPhysicalPlan> visitCreateDatabase(
final DatabaseSchemaPlan createDatabasePlan, final IoTDBPipePattern pattern) {
final DatabaseSchemaPlan createDatabasePlan, final IoTDBTreePattern pattern) {
return pattern.matchPrefixPath(createDatabasePlan.getSchema().getName())
? Optional.of(createDatabasePlan)
: Optional.empty();
}

@Override
public Optional<ConfigPhysicalPlan> visitAlterDatabase(
final DatabaseSchemaPlan alterDatabasePlan, final IoTDBPipePattern pattern) {
final DatabaseSchemaPlan alterDatabasePlan, final IoTDBTreePattern pattern) {
return pattern.matchPrefixPath(alterDatabasePlan.getSchema().getName())
? Optional.of(alterDatabasePlan)
: Optional.empty();
}

@Override
public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
final DeleteDatabasePlan deleteDatabasePlan, final IoTDBPipePattern pattern) {
final DeleteDatabasePlan deleteDatabasePlan, final IoTDBTreePattern pattern) {
return pattern.matchPrefixPath(deleteDatabasePlan.getName())
? Optional.of(deleteDatabasePlan)
: Optional.empty();
}

@Override
public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
final CreateSchemaTemplatePlan createSchemaTemplatePlan, final IoTDBPipePattern pattern) {
final CreateSchemaTemplatePlan createSchemaTemplatePlan, final IoTDBTreePattern pattern) {
// This is a deserialized template and can be arbitrarily altered
final Template template = createSchemaTemplatePlan.getTemplate();
template.getSchemaMap().keySet().removeIf(measurement -> !pattern.matchTailNode(measurement));
Expand All @@ -124,7 +124,7 @@ public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
@Override
public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan,
final IoTDBPipePattern pattern) {
final IoTDBTreePattern pattern) {
return pattern.matchPrefixPath(commitSetSchemaTemplatePlan.getPath())
? Optional.of(commitSetSchemaTemplatePlan)
: Optional.empty();
Expand All @@ -133,15 +133,15 @@ public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
@Override
public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan,
final IoTDBPipePattern pattern) {
final IoTDBTreePattern pattern) {
return pattern.matchPrefixPath(pipeUnsetSchemaTemplatePlan.getPath())
? Optional.of(pipeUnsetSchemaTemplatePlan)
: Optional.empty();
}

@Override
public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final IoTDBPipePattern pattern) {
final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final IoTDBTreePattern pattern) {
final TemplateExtendInfo extendInfo = extendSchemaTemplatePlan.getTemplateExtendInfo();
final int[] filteredIndexes =
IntStream.range(0, extendInfo.getMeasurements().size())
Expand All @@ -152,41 +152,41 @@ public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
new ExtendSchemaTemplatePlan(
new TemplateExtendInfo(
extendInfo.getTemplateName(),
IoTDBPipePattern.applyIndexesOnList(
IoTDBTreePattern.applyIndexesOnList(
filteredIndexes, extendInfo.getMeasurements()),
IoTDBPipePattern.applyIndexesOnList(filteredIndexes, extendInfo.getDataTypes()),
IoTDBPipePattern.applyIndexesOnList(filteredIndexes, extendInfo.getEncodings()),
IoTDBPipePattern.applyIndexesOnList(
IoTDBTreePattern.applyIndexesOnList(filteredIndexes, extendInfo.getDataTypes()),
IoTDBTreePattern.applyIndexesOnList(filteredIndexes, extendInfo.getEncodings()),
IoTDBTreePattern.applyIndexesOnList(
filteredIndexes, extendInfo.getCompressors()))))
: Optional.empty();
}

@Override
public Optional<ConfigPhysicalPlan> visitGrantUser(
final AuthorPlan grantUserPlan, final IoTDBPipePattern pattern) {
final AuthorPlan grantUserPlan, final IoTDBTreePattern pattern) {
return visitPathRelatedAuthorPlan(grantUserPlan, pattern);
}

@Override
public Optional<ConfigPhysicalPlan> visitRevokeUser(
final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
final AuthorPlan revokeUserPlan, final IoTDBTreePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}

@Override
public Optional<ConfigPhysicalPlan> visitGrantRole(
final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
final AuthorPlan revokeUserPlan, final IoTDBTreePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}

@Override
public Optional<ConfigPhysicalPlan> visitRevokeRole(
final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
final AuthorPlan revokeUserPlan, final IoTDBTreePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}

private Optional<ConfigPhysicalPlan> visitPathRelatedAuthorPlan(
final AuthorPlan pathRelatedAuthorPlan, final IoTDBPipePattern pattern) {
final AuthorPlan pathRelatedAuthorPlan, final IoTDBTreePattern pattern) {
final List<PartialPath> intersectedPaths =
pathRelatedAuthorPlan.getNodeNameList().stream()
.map(pattern::getIntersection)
Expand All @@ -208,7 +208,7 @@ private Optional<ConfigPhysicalPlan> visitPathRelatedAuthorPlan(

@Override
public Optional<ConfigPhysicalPlan> visitPipeDeleteTimeSeries(
final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan, final IoTDBPipePattern pattern) {
final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan, final IoTDBTreePattern pattern) {
try {
final PathPatternTree intersectedTree =
pattern.getIntersection(
Expand All @@ -226,7 +226,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeleteTimeSeries(

@Override
public Optional<ConfigPhysicalPlan> visitPipeDeleteLogicalView(
final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, final IoTDBPipePattern pattern) {
final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, final IoTDBTreePattern pattern) {
try {
final PathPatternTree intersectedTree =
pattern.getIntersection(
Expand All @@ -244,7 +244,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeleteLogicalView(

@Override
public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(
final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, final IoTDBPipePattern pattern) {
final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, final IoTDBTreePattern pattern) {
final Map<PartialPath, List<Template>> newTemplateSetInfo =
pipeDeactivateTemplatePlan.getTemplateSetInfo().entrySet().stream()
.flatMap(
Expand All @@ -267,7 +267,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(

@Override
public Optional<ConfigPhysicalPlan> visitTTL(
final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) {
final SetTTLPlan setTTLPlan, final IoTDBTreePattern pattern) {
final PartialPath partialPath = new PartialPath(setTTLPlan.getPathPattern());
final List<PartialPath> intersectionList =
pattern.matchPrefixPath(partialPath.getFullPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
Expand Down Expand Up @@ -354,8 +354,8 @@ protected TSStatus loadFileV2(
final Set<ConfigPhysicalPlanType> executionTypes =
PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final IoTDBPipePattern pattern =
new IoTDBPipePattern(parameters.get(ColumnHeaderConstant.PATH_PATTERN));
final IoTDBTreePattern pattern =
new IoTDBTreePattern(parameters.get(ColumnHeaderConstant.PATH_PATTERN));
final List<TSStatus> results = new ArrayList<>();
while (generator.hasNext()) {
IoTDBConfigRegionExtractor.PATTERN_PARSE_VISITOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
Expand Down Expand Up @@ -54,8 +54,8 @@

public class PipeConfigPhysicalPlanPatternParseVisitorTest {

private final IoTDBPipePattern prefixPathPattern = new IoTDBPipePattern("root.db.device.**");
private final IoTDBPipePattern fullPathPattern = new IoTDBPipePattern("root.db.device.s1");
private final IoTDBTreePattern prefixPathPattern = new IoTDBTreePattern("root.db.device.**");
private final IoTDBTreePattern fullPathPattern = new IoTDBTreePattern("root.db.device.s1");

@Test
public void testCreateDatabase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
Expand Down Expand Up @@ -145,15 +145,16 @@ private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDat
// Only used by events containing delete data node, no need to bind progress index here since
// delete data event does not have progress index currently
IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
.process(deleteDataEvent.getPlanNode(), (IoTDBPipePattern) deleteDataEvent.getPipePattern())
.process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) deleteDataEvent.getTreePattern())
.map(
planNode ->
new PipeSchemaRegionWritePlanEvent(
planNode,
deleteDataEvent.getPipeName(),
deleteDataEvent.getCreationTime(),
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getPipePattern(),
deleteDataEvent.getTreePattern(),
deleteDataEvent.getTablePattern(),
deleteDataEvent.isGeneratedByPipe()))
.ifPresent(
event -> {
Expand Down
Loading