From dd861a526349a2f0322b12c7d2724c68668f24f0 Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Sat, 16 May 2026 19:02:40 +0800 Subject: [PATCH] Fix LIKE with dynamic pattern expressions --- ...IoTDBPipeReceiverAutoCreateDisabledIT.java | 12 +++- .../plan/planner/LogicalPlanVisitor.java | 1 + .../plan/node/load/LoadSingleTsFileNode.java | 9 +++ .../plan/node/load/LoadTsFileNode.java | 8 ++- .../PredicatePushIntoMetadataChecker.java | 5 +- ...ConvertSchemaPredicateToFilterVisitor.java | 3 +- .../relational/planner/RelationPlanner.java | 1 + .../scheduler/load/LoadTsFileScheduler.java | 70 +++++++++++++++---- .../planner/node/load/LoadTsFileNodeTest.java | 5 +- .../relational/analyzer/AnalyzerTest.java | 37 ++++++++++ 10 files changed, 132 insertions(+), 19 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java index 6d4f2b80b33cb..73e480401d8b3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java @@ -57,7 +57,17 @@ public void setUp() { @Override protected void setupConfig() { super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1); receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false); + receiverEnv + .getConfig() + .getCommonConfig() + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1); } @Test @@ -72,7 +82,7 @@ public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries() throws E "create pipe test with source ('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true') " + "with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s');", receiverEnv.getDataNodeWrapper(0).getIpAndPortString()); - final String createDatabaseSql = "create database root.test.sg;"; + final String createDatabaseSql = "create database root.test;"; final String createFirstTimeSeriesSql = "create timeseries root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;"; final String insertFirstSql = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index a9b0b9d83f4fe..1aa0bbfe76256 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -555,6 +555,7 @@ public PlanNode visitLoadFile( loadTsFileStatement.getResources(), isTableModel, loadTsFileStatement.getDatabase(), + loadTsFileStatement.getDatabaseLevel(), loadTsFileStatement.isNeedDecode4TimeColumn()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 24dd28712312a..75f3aee3bfe6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -59,6 +59,7 @@ public class LoadSingleTsFileNode extends WritePlanNode { private final TsFileResource resource; private final boolean isTableModel; private final String database; + private final int databaseLevel; private final boolean deleteAfterLoad; private final long writePointCount; private boolean needDecodeTsFile; @@ -70,6 +71,7 @@ public LoadSingleTsFileNode( final TsFileResource resource, final boolean isTableModel, final String database, + final int databaseLevel, final boolean deleteAfterLoad, final long writePointCount, final boolean needDecodeTsFile) { @@ -78,6 +80,7 @@ public LoadSingleTsFileNode( this.resource = resource; this.isTableModel = isTableModel; this.database = database; + this.databaseLevel = databaseLevel; this.deleteAfterLoad = deleteAfterLoad; this.writePointCount = writePointCount; this.needDecodeTsFile = needDecodeTsFile; @@ -175,6 +178,10 @@ public String getDatabase() { return database; } + public int getDatabaseLevel() { + return databaseLevel; + } + @Override public TRegionReplicaSet getRegionReplicaSet() { return null; @@ -258,6 +265,7 @@ public boolean equals(Object o) { && Objects.equals(resource, loadSingleTsFileNode.resource) && Objects.equals(isTableModel, loadSingleTsFileNode.isTableModel) && Objects.equals(database, loadSingleTsFileNode.database) + && Objects.equals(databaseLevel, loadSingleTsFileNode.databaseLevel) && Objects.equals(needDecodeTsFile, loadSingleTsFileNode.needDecodeTsFile) && Objects.equals(deleteAfterLoad, loadSingleTsFileNode.deleteAfterLoad) && Objects.equals(localRegionReplicaSet, loadSingleTsFileNode.localRegionReplicaSet); @@ -270,6 +278,7 @@ public int hashCode() { resource, isTableModel, database, + databaseLevel, needDecodeTsFile, deleteAfterLoad, localRegionReplicaSet); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index 26917e858d0a7..11f54cdd77ade 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -46,6 +46,7 @@ public class LoadTsFileNode extends WritePlanNode { private final List resources; private final List isTableModel; private final String database; + private final int databaseLevel; private final boolean needDecode4TimeColumn; public LoadTsFileNode( @@ -53,11 +54,13 @@ public LoadTsFileNode( final List resources, final List isTableModel, final String database, + final int databaseLevel, final boolean needDecode4TimeColumn) { super(id); this.resources = resources; this.isTableModel = isTableModel; this.database = database; + this.databaseLevel = databaseLevel; this.needDecode4TimeColumn = needDecode4TimeColumn; } @@ -126,6 +129,7 @@ private List splitByPartitionForTreeModel(Analysis analysis) { resources.get(i), isTableModel.get(i), database, + databaseLevel, statement.isDeleteAfterLoad(), statement.getWritePointCount(i), needDecode4TimeColumn)); @@ -149,6 +153,7 @@ private List splitByPartitionForTableModel( resources.get(i), isTableModel.get(i), database, + databaseLevel, statement.isDeleteAfterLoad(), statement.getWritePointCount(i), needDecode4TimeColumn)); @@ -170,11 +175,12 @@ public boolean equals(Object o) { LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o; return Objects.equals(resources, loadTsFileNode.resources) && Objects.equals(database, loadTsFileNode.database) + && Objects.equals(databaseLevel, loadTsFileNode.databaseLevel) && Objects.equals(isTableModel, loadTsFileNode.isTableModel); } @Override public int hashCode() { - return Objects.hash(resources, database, isTableModel); + return Objects.hash(resources, database, databaseLevel, isTableModel); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoMetadataChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoMetadataChecker.java index ec0f4f521c0bb..27d814ccd80f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoMetadataChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoMetadataChecker.java @@ -195,7 +195,10 @@ public Boolean visitIsNotNullPredicate(final IsNotNullPredicate node, final Void @Override public Boolean visitLikePredicate(final LikePredicate node, final Void context) { - return node.getValue().accept(this, context); + return node.getValue() instanceof SymbolReference + && node.getValue().accept(this, context) + && node.getPattern() instanceof StringLiteral + && (!node.getEscape().isPresent() || node.getEscape().get() instanceof StringLiteral); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/schema/ConvertSchemaPredicateToFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/schema/ConvertSchemaPredicateToFilterVisitor.java index 78227941a52b1..07df741ab1b96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/schema/ConvertSchemaPredicateToFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/schema/ConvertSchemaPredicateToFilterVisitor.java @@ -113,7 +113,8 @@ public SchemaFilter visitIsNotNullPredicate( final LikePredicate node, final Context context) { // TODO: Support stringLiteral like tag/attr? if (!(node.getValue() instanceof SymbolReference) - || !(node.getPattern() instanceof StringLiteral)) { + || !(node.getPattern() instanceof StringLiteral) + || (node.getEscape().isPresent() && !(node.getEscape().get() instanceof StringLiteral))) { return null; } return wrapTagOrAttributeFilter( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 949ba6a813c84..25fd7b4a10977 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1401,6 +1401,7 @@ public RelationPlan visitLoadTsFile(final LoadTsFile node, final Void context) { node.getResources(), isTableModel, node.getDatabase(), + node.getDatabaseLevel(), node.isNeedDecode4TimeColumn()), analysis.getRootScope(), Collections.emptyList(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 6251a67689a8d..788fcaaa08b84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -30,10 +30,12 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -180,7 +182,8 @@ public void start() { final LoadSingleTsFileNode node = tsFileNodeList.get(i); final String filePath = node.getTsFileResource().getTsFilePath(); - partitionFetcher.setDatabase(getPartitionQueryDatabase(node, isGeneratedByPipe)); + partitionFetcher.setDatabase( + getPartitionQueryDatabase(node, isGeneratedByPipe), node.getDatabaseLevel()); boolean isLoadSingleTsFileSuccess = true; boolean shouldRemoveFileFromLoadingSet = false; @@ -634,7 +637,38 @@ private void convertFailedTsFilesToTabletsAndRetry() { static String getPartitionQueryDatabase( final LoadSingleTsFileNode node, final boolean isGeneratedByPipe) { - return node.isTableModel() || isGeneratedByPipe ? node.getDatabase() : null; + if (node.isTableModel()) { + return node.getDatabase(); + } + if (!isGeneratedByPipe) { + return null; + } + return node.getDatabase() != null + ? node.getDatabase() + : inferDatabaseName(node.getTsFileResource().getDevices(), node.getDatabaseLevel()); + } + + private static String inferDatabaseName(final Set devices, final int databaseLevel) { + if (devices == null || devices.isEmpty()) { + return null; + } + return inferDatabaseName(devices.iterator().next(), databaseLevel); + } + + private static String inferDatabaseName(final IDeviceID deviceID, final int databaseLevel) { + try { + final String[] deviceNodes = new PartialPath(deviceID).getNodes(); + final int databaseNodesLength = databaseLevel + 1; + if (deviceNodes.length < databaseNodesLength) { + return null; + } + final String[] databaseNodes = new String[databaseNodesLength]; + System.arraycopy(deviceNodes, 0, databaseNodes, 0, databaseNodesLength); + return new PartialPath(databaseNodes).getFullPath(); + } catch (final IllegalPathException e) { + LOGGER.warn("Failed to infer database name from device {}.", deviceID, e); + return null; + } } private LoadTsFileStatement buildRetryTreeLoadStatement( @@ -847,13 +881,15 @@ private void clear() { private static class DataPartitionBatchFetcher { private final IPartitionFetcher fetcher; private String database; + private int databaseLevel; public DataPartitionBatchFetcher(IPartitionFetcher fetcher) { this.fetcher = fetcher; } - public void setDatabase(String database) { + public void setDatabase(String database, int databaseLevel) { this.database = database; + this.databaseLevel = databaseLevel; } public List queryDataPartition( @@ -869,14 +905,17 @@ public List queryDataPartition( replicaSets.addAll( subSlotList.stream() .map( - pair -> - // database is an explicit database hint for table-model loads and - // pipe-generated tree-model loads. - database != null - ? dataPartition.getDataRegionReplicaSetForWriting( - pair.left, pair.right, database) - : dataPartition.getDataRegionReplicaSetForWriting( - pair.left, pair.right)) + pair -> { + // database is an explicit database hint for table-model loads and + // pipe-generated tree-model loads. When a pipe-generated tree-model load + // only carries database-level, infer the database from the device. + final String queryDatabase = + database != null ? database : inferDatabaseName(pair.left, databaseLevel); + return queryDatabase != null + ? dataPartition.getDataRegionReplicaSetForWriting( + pair.left, pair.right, queryDatabase) + : dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right); + }) .collect(Collectors.toList())); } return replicaSets; @@ -895,9 +934,12 @@ private List toQueryParam( DataPartitionQueryParam queryParam = new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())); // database is an explicit database hint for table-model loads and - // pipe-generated tree-model loads. - if (database != null) { - queryParam.setDatabaseName(database); + // pipe-generated tree-model loads. When a pipe-generated tree-model load + // only carries database-level, infer the database from the device. + final String queryDatabase = + database != null ? database : inferDatabaseName(entry.getKey(), databaseLevel); + if (queryDatabase != null) { + queryParam.setDatabaseName(queryDatabase); } return queryParam; }) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index e589810894610..a70de3a725bfa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -40,11 +40,14 @@ public class LoadTsFileNodeTest { public void testLoadSingleTsFileNode() { TsFileResource resource = new TsFileResource(new File("1")); String database = "root.db"; + int databaseLevel = 1; LoadSingleTsFileNode node = - new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L, false); + new LoadSingleTsFileNode( + new PlanNodeId(""), resource, false, database, databaseLevel, true, 0L, false); Assert.assertTrue(node.isDeleteAfterLoad()); Assert.assertEquals(resource, node.getTsFileResource()); Assert.assertEquals(database, node.getDatabase()); + Assert.assertEquals(databaseLevel, node.getDatabaseLevel()); Assert.assertNull(node.getLocalRegionReplicaSet()); Assert.assertNull(node.getRegionReplicaSet()); Assert.assertEquals(Collections.emptyList(), node.getChildren()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index 107070425e58b..6487429edf2a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@ -671,6 +671,43 @@ public void expressionTest() { assertNull(deviceTableScanNode.getPushDownPredicate()); assertFalse(deviceTableScanNode.getTimePredicate().isPresent()); + sql = "SELECT * FROM table1 WHERE tag1 like 'A' || '%'"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + analysis = analyzeSQL(sql, metadata, context); + symbolAllocator = new SymbolAllocator(); + logicalQueryPlan = + new TableLogicalPlanner( + context, metadata, sessionInfo, symbolAllocator, WarningCollector.NOOP) + .plan(analysis); + rootNode = logicalQueryPlan.getRootNode(); + + // Like with a non-literal pattern is evaluated by the filter operator. + assertTrue(rootNode.getChildren().get(0) instanceof FilterNode); + filterNode = (FilterNode) rootNode.getChildren().get(0); + assertTrue(filterNode.getPredicate().toString().contains("LIKE")); + assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof DeviceTableScanNode); + deviceTableScanNode = (DeviceTableScanNode) rootNode.getChildren().get(0).getChildren().get(0); + assertNull(deviceTableScanNode.getPushDownPredicate()); + assertFalse(deviceTableScanNode.getTimePredicate().isPresent()); + + sql = "SELECT * FROM table1 WHERE tag1 like concat('A', '%')"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + analysis = analyzeSQL(sql, metadata, context); + symbolAllocator = new SymbolAllocator(); + logicalQueryPlan = + new TableLogicalPlanner( + context, metadata, sessionInfo, symbolAllocator, WarningCollector.NOOP) + .plan(analysis); + rootNode = logicalQueryPlan.getRootNode(); + + assertTrue(rootNode.getChildren().get(0) instanceof FilterNode); + filterNode = (FilterNode) rootNode.getChildren().get(0); + assertTrue(filterNode.getPredicate().toString().contains("LIKE")); + assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof DeviceTableScanNode); + deviceTableScanNode = (DeviceTableScanNode) rootNode.getChildren().get(0).getChildren().get(0); + assertNull(deviceTableScanNode.getPushDownPredicate()); + assertFalse(deviceTableScanNode.getTimePredicate().isPresent()); + // 3. in / not in sql = "SELECT *, s1/2, s2+1, s2*3, s1+s2, s2%1 FROM table1 WHERE tag1 in ('A', 'B') and tag2 not in ('A', 'C')";