Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ public PlanNode visitLoadFile(
loadTsFileStatement.getResources(),
isTableModel,
loadTsFileStatement.getDatabase(),
loadTsFileStatement.getDatabaseLevel(),
loadTsFileStatement.isNeedDecode4TimeColumn());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
private final TsFileResource resource;
private final boolean isTableModel;
private final String database;
private final int databaseLevel;
private final boolean deleteAfterLoad;
private final long writePointCount;
private boolean needDecodeTsFile;
Expand All @@ -70,6 +71,7 @@ public LoadSingleTsFileNode(
final TsFileResource resource,
final boolean isTableModel,
final String database,
final int databaseLevel,
final boolean deleteAfterLoad,
final long writePointCount,
final boolean needDecodeTsFile) {
Expand All @@ -78,6 +80,7 @@ public LoadSingleTsFileNode(
this.resource = resource;
this.isTableModel = isTableModel;
this.database = database;
this.databaseLevel = databaseLevel;
this.deleteAfterLoad = deleteAfterLoad;
this.writePointCount = writePointCount;
this.needDecodeTsFile = needDecodeTsFile;
Expand Down Expand Up @@ -175,6 +178,10 @@ public String getDatabase() {
return database;
}

public int getDatabaseLevel() {
return databaseLevel;
}

@Override
public TRegionReplicaSet getRegionReplicaSet() {
return null;
Expand Down Expand Up @@ -258,6 +265,7 @@ public boolean equals(Object o) {
&& Objects.equals(resource, loadSingleTsFileNode.resource)
&& Objects.equals(isTableModel, loadSingleTsFileNode.isTableModel)
&& Objects.equals(database, loadSingleTsFileNode.database)
&& Objects.equals(databaseLevel, loadSingleTsFileNode.databaseLevel)
&& Objects.equals(needDecodeTsFile, loadSingleTsFileNode.needDecodeTsFile)
&& Objects.equals(deleteAfterLoad, loadSingleTsFileNode.deleteAfterLoad)
&& Objects.equals(localRegionReplicaSet, loadSingleTsFileNode.localRegionReplicaSet);
Expand All @@ -270,6 +278,7 @@ public int hashCode() {
resource,
isTableModel,
database,
databaseLevel,
needDecodeTsFile,
deleteAfterLoad,
localRegionReplicaSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ public class LoadTsFileNode extends WritePlanNode {
private final List<TsFileResource> resources;
private final List<Boolean> isTableModel;
private final String database;
private final int databaseLevel;
private final boolean needDecode4TimeColumn;

public LoadTsFileNode(
final PlanNodeId id,
final List<TsFileResource> resources,
final List<Boolean> isTableModel,
final String database,
final int databaseLevel,
final boolean needDecode4TimeColumn) {
super(id);
this.resources = resources;
this.isTableModel = isTableModel;
this.database = database;
this.databaseLevel = databaseLevel;
this.needDecode4TimeColumn = needDecode4TimeColumn;
}

Expand Down Expand Up @@ -126,6 +129,7 @@ private List<WritePlanNode> splitByPartitionForTreeModel(Analysis analysis) {
resources.get(i),
isTableModel.get(i),
database,
databaseLevel,
statement.isDeleteAfterLoad(),
statement.getWritePointCount(i),
needDecode4TimeColumn));
Expand All @@ -149,6 +153,7 @@ private List<WritePlanNode> splitByPartitionForTableModel(
resources.get(i),
isTableModel.get(i),
database,
databaseLevel,
statement.isDeleteAfterLoad(),
statement.getWritePointCount(i),
needDecode4TimeColumn));
Expand All @@ -170,11 +175,12 @@ public boolean equals(Object o) {
LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o;
return Objects.equals(resources, loadTsFileNode.resources)
&& Objects.equals(database, loadTsFileNode.database)
&& Objects.equals(databaseLevel, loadTsFileNode.databaseLevel)
&& Objects.equals(isTableModel, loadTsFileNode.isTableModel);
}

@Override
public int hashCode() {
return Objects.hash(resources, database, isTableModel);
return Objects.hash(resources, database, databaseLevel, isTableModel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ public Boolean visitIsNotNullPredicate(final IsNotNullPredicate node, final Void

@Override
public Boolean visitLikePredicate(final LikePredicate node, final Void context) {
return node.getValue().accept(this, context);
return node.getValue() instanceof SymbolReference
&& node.getValue().accept(this, context)
&& node.getPattern() instanceof StringLiteral
&& (!node.getEscape().isPresent() || node.getEscape().get() instanceof StringLiteral);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public SchemaFilter visitIsNotNullPredicate(
final LikePredicate node, final Context context) {
// TODO: Support stringLiteral like tag/attr?
if (!(node.getValue() instanceof SymbolReference)
|| !(node.getPattern() instanceof StringLiteral)) {
|| !(node.getPattern() instanceof StringLiteral)
|| (node.getEscape().isPresent() && !(node.getEscape().get() instanceof StringLiteral))) {
return null;
}
return wrapTagOrAttributeFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,7 @@ public RelationPlan visitLoadTsFile(final LoadTsFile node, final Void context) {
node.getResources(),
isTableModel,
node.getDatabase(),
node.getDatabaseLevel(),
node.isNeedDecode4TimeColumn()),
analysis.getRootScope(),
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
Expand Down Expand Up @@ -180,7 +182,8 @@ public void start() {
final LoadSingleTsFileNode node = tsFileNodeList.get(i);
final String filePath = node.getTsFileResource().getTsFilePath();

partitionFetcher.setDatabase(getPartitionQueryDatabase(node, isGeneratedByPipe));
partitionFetcher.setDatabase(
getPartitionQueryDatabase(node, isGeneratedByPipe), node.getDatabaseLevel());

boolean isLoadSingleTsFileSuccess = true;
boolean shouldRemoveFileFromLoadingSet = false;
Expand Down Expand Up @@ -634,7 +637,38 @@ private void convertFailedTsFilesToTabletsAndRetry() {

static String getPartitionQueryDatabase(
final LoadSingleTsFileNode node, final boolean isGeneratedByPipe) {
return node.isTableModel() || isGeneratedByPipe ? node.getDatabase() : null;
if (node.isTableModel()) {
return node.getDatabase();
}
if (!isGeneratedByPipe) {
return null;
}
return node.getDatabase() != null
? node.getDatabase()
: inferDatabaseName(node.getTsFileResource().getDevices(), node.getDatabaseLevel());
}

private static String inferDatabaseName(final Set<IDeviceID> devices, final int databaseLevel) {
if (devices == null || devices.isEmpty()) {
return null;
}
return inferDatabaseName(devices.iterator().next(), databaseLevel);
}

private static String inferDatabaseName(final IDeviceID deviceID, final int databaseLevel) {
try {
final String[] deviceNodes = new PartialPath(deviceID).getNodes();
final int databaseNodesLength = databaseLevel + 1;
if (deviceNodes.length < databaseNodesLength) {
return null;
}
final String[] databaseNodes = new String[databaseNodesLength];
System.arraycopy(deviceNodes, 0, databaseNodes, 0, databaseNodesLength);
return new PartialPath(databaseNodes).getFullPath();
} catch (final IllegalPathException e) {
LOGGER.warn("Failed to infer database name from device {}.", deviceID, e);
return null;
}
}

private LoadTsFileStatement buildRetryTreeLoadStatement(
Expand Down Expand Up @@ -847,13 +881,15 @@ private void clear() {
private static class DataPartitionBatchFetcher {
private final IPartitionFetcher fetcher;
private String database;
private int databaseLevel;

public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
this.fetcher = fetcher;
}

public void setDatabase(String database) {
public void setDatabase(String database, int databaseLevel) {
this.database = database;
this.databaseLevel = databaseLevel;
}

public List<TRegionReplicaSet> queryDataPartition(
Expand All @@ -869,14 +905,17 @@ public List<TRegionReplicaSet> queryDataPartition(
replicaSets.addAll(
subSlotList.stream()
.map(
pair ->
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
database != null
? dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right, database)
: dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right))
pair -> {
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads. When a pipe-generated tree-model load
// only carries database-level, infer the database from the device.
final String queryDatabase =
database != null ? database : inferDatabaseName(pair.left, databaseLevel);
return queryDatabase != null
? dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right, queryDatabase)
: dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right);
})
.collect(Collectors.toList()));
}
return replicaSets;
Expand All @@ -895,9 +934,12 @@ private List<DataPartitionQueryParam> toQueryParam(
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue()));
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
if (database != null) {
queryParam.setDatabaseName(database);
// pipe-generated tree-model loads. When a pipe-generated tree-model load
// only carries database-level, infer the database from the device.
final String queryDatabase =
database != null ? database : inferDatabaseName(entry.getKey(), databaseLevel);
if (queryDatabase != null) {
queryParam.setDatabaseName(queryDatabase);
}
return queryParam;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ public class LoadTsFileNodeTest {
public void testLoadSingleTsFileNode() {
TsFileResource resource = new TsFileResource(new File("1"));
String database = "root.db";
int databaseLevel = 1;
LoadSingleTsFileNode node =
new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L, false);
new LoadSingleTsFileNode(
new PlanNodeId(""), resource, false, database, databaseLevel, true, 0L, false);
Assert.assertTrue(node.isDeleteAfterLoad());
Assert.assertEquals(resource, node.getTsFileResource());
Assert.assertEquals(database, node.getDatabase());
Assert.assertEquals(databaseLevel, node.getDatabaseLevel());
Assert.assertNull(node.getLocalRegionReplicaSet());
Assert.assertNull(node.getRegionReplicaSet());
Assert.assertEquals(Collections.emptyList(), node.getChildren());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,43 @@ public void expressionTest() {
assertNull(deviceTableScanNode.getPushDownPredicate());
assertFalse(deviceTableScanNode.getTimePredicate().isPresent());

sql = "SELECT * FROM table1 WHERE tag1 like 'A' || '%'";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
analysis = analyzeSQL(sql, metadata, context);
symbolAllocator = new SymbolAllocator();
logicalQueryPlan =
new TableLogicalPlanner(
context, metadata, sessionInfo, symbolAllocator, WarningCollector.NOOP)
.plan(analysis);
rootNode = logicalQueryPlan.getRootNode();

// Like with a non-literal pattern is evaluated by the filter operator.
assertTrue(rootNode.getChildren().get(0) instanceof FilterNode);
filterNode = (FilterNode) rootNode.getChildren().get(0);
assertTrue(filterNode.getPredicate().toString().contains("LIKE"));
assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof DeviceTableScanNode);
deviceTableScanNode = (DeviceTableScanNode) rootNode.getChildren().get(0).getChildren().get(0);
assertNull(deviceTableScanNode.getPushDownPredicate());
assertFalse(deviceTableScanNode.getTimePredicate().isPresent());

sql = "SELECT * FROM table1 WHERE tag1 like concat('A', '%')";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
analysis = analyzeSQL(sql, metadata, context);
symbolAllocator = new SymbolAllocator();
logicalQueryPlan =
new TableLogicalPlanner(
context, metadata, sessionInfo, symbolAllocator, WarningCollector.NOOP)
.plan(analysis);
rootNode = logicalQueryPlan.getRootNode();

assertTrue(rootNode.getChildren().get(0) instanceof FilterNode);
filterNode = (FilterNode) rootNode.getChildren().get(0);
assertTrue(filterNode.getPredicate().toString().contains("LIKE"));
assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof DeviceTableScanNode);
deviceTableScanNode = (DeviceTableScanNode) rootNode.getChildren().get(0).getChildren().get(0);
assertNull(deviceTableScanNode.getPushDownPredicate());
assertFalse(deviceTableScanNode.getTimePredicate().isPresent());

// 3. in / not in
sql =
"SELECT *, s1/2, s2+1, s2*3, s1+s2, s2%1 FROM table1 WHERE tag1 in ('A', 'B') and tag2 not in ('A', 'C')";
Expand Down