Skip to content

Commit

Permalink
[IOTDB-305] Add value filter function while executing group by device (
Browse files Browse the repository at this point in the history
…#687)

* do not join on timestamps between devices when using 'group by device'
  • Loading branch information
Alima777 authored and Jialin Qiao committed Jan 2, 2020
1 parent b17f402 commit 64d5ed1
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,14 @@ List<String> getAllStorageGroupNames() {
return mtree.getAllStorageGroupList();
}

Set<String> getAllDevices() throws SQLException {
List<String> getAllDevices() throws PathException {
return mtree.getAllDevices();
}

public List<String> getDevices(String prefixPath) throws PathException {
return mtree.getDevices(prefixPath);
}

List<String> getNodesList(String schemaPattern, int nodeLevel) throws SQLException {
return mtree.getNodesList(schemaPattern, nodeLevel);
}
Expand Down
19 changes: 17 additions & 2 deletions server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,9 @@ public Metadata getMetadata() throws PathException {
/**
* Get the full devices info.
*
* @return A HashSet instance which stores all devices info
* @return A list instance which stores all device names
*/
public Set<String> getAllDevices() throws SQLException {
public List<String> getAllDevices() throws PathException {

lock.readLock().lock();
try {
Expand All @@ -880,6 +880,21 @@ public Set<String> getAllDevices() throws SQLException {
}
}

/**
* Get devices info with given prefixPath.
*
* @return A HashSet instance which stores devices names with given prefixPath.
*/
public List<String> getDevices(String prefixPath) throws PathException {

lock.readLock().lock();
try {
return mgraph.getDevices(prefixPath);
} finally {
lock.readLock().unlock();
}
}

/**
* Get all nodes from the given level
*
Expand Down
67 changes: 48 additions & 19 deletions server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,30 +760,59 @@ private void findStorageGroup(MNode node, String path, List<String> res) {
/**
* Get all devices in current Metadata Tree.
*
* @return a list contains all distinct devices
* @return a list contains all distinct device names
*/
Set<String> getAllDevices() {
HashSet<String> devices = new HashSet<>();
MNode node;
if ((node = getRoot()) != null) {
findDevices(node, SQLConstant.ROOT, devices);
}
return new LinkedHashSet<>(devices);
List<String> getAllDevices() throws PathException {
return getDevices(SQLConstant.ROOT);
}

private void findDevices(MNode node, String path, HashSet<String> res) {
if (node == null) {
return;
/**
* Get all devices in current Metadata Tree with prefixPath.
*
* @return a list contains all distinct devices names
*/
List<String> getDevices(String prefixPath) throws PathException {
String[] nodes = MetaUtils.getNodeNames(prefixPath, PATH_SEPARATOR);
if (nodes.length == 0 || !nodes[0].equals(getRoot().getName())) {
throw new MTreePathException("PrefixPath", prefixPath);
}
if (node.isLeaf()) {
res.add(path);
return;
List<String> devices = new ArrayList<>();
findDevices(getRoot(), nodes, 1, "", devices);
return devices;
}

/**
* Traverse the MTree to match all devices with prefix path.
* @param node the current traversing node
* @param nodes split the prefix path with '.'
* @param idx the current index of array nodes
* @param parent store the node string having traversed
* @param res store all matched device names
*/
private void findDevices(MNode node, String[] nodes, int idx, String parent, List<String> res) {
String nodeReg;
if (idx >= nodes.length) {
nodeReg = "*";
} else {
nodeReg = nodes[idx];
}
for (MNode child : node.getChildren().values()) {
if (child.isLeaf()) {
res.add(path);
} else {
findDevices(child, path + "." + child.toString(), res);
if (!("*").equals(nodeReg)) {
if (node.hasChild(nodeReg)) {
if(node.getChild(nodeReg).isLeaf()){
res.add(parent + node.getName());
} else{
findDevices(node.getChild(nodeReg), nodes, idx + 1, parent + node.getName() + ".", res);
}
}
} else {
boolean deviceAdded = false;
for (MNode child : node.getChildren().values()) {
if(child.isLeaf() && !deviceAdded){
res.add(parent + node.getName());
deviceAdded = true;
} else if (!child.isLeaf()){
findDevices(child, nodes, idx + 1, parent + node.getName() + ".", res);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,21 @@
*/
package org.apache.iotdb.db.qp.executor;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;

import java.io.IOException;
import java.util.List;

public interface IQueryProcessExecutor {

/**
Expand Down Expand Up @@ -128,6 +121,6 @@ void update(Path path, long startTime, long endTime, String value)
/**
* Get all paths of a full path
*/
List<String> getAllPaths(String originPath) throws MetadataException;
List<String> getAllMatchedPaths(String originPath) throws MetadataException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,10 @@
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
Expand All @@ -69,7 +62,6 @@
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
Expand Down Expand Up @@ -459,7 +451,7 @@ public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProces
}

@Override
public List<String> getAllPaths(String originPath) throws MetadataException {
public List<String> getAllMatchedPaths(String originPath) throws MetadataException {
return MManager.getInstance().getPaths(originPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
BasicFunctionOperator that = (BasicFunctionOperator) o;
return Objects.equals(path, that.path) &&
Objects.equals(value, that.value) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
*/
package org.apache.iotdb.db.qp.logical.crud;

import static org.apache.iotdb.db.qp.constant.SQLConstant.KW_AND;
import static org.apache.iotdb.db.qp.constant.SQLConstant.KW_OR;

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
Expand All @@ -37,6 +32,12 @@
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.StringContainer;

import java.util.ArrayList;
import java.util.List;

import static org.apache.iotdb.db.qp.constant.SQLConstant.KW_AND;
import static org.apache.iotdb.db.qp.constant.SQLConstant.KW_OR;

/**
* This class is for filter operator and implements {@link Operator} . It may consist of more than
* two child FilterOperators, but if it's not a leaf operator, the relation is the same among all of
Expand Down Expand Up @@ -208,8 +209,10 @@ public boolean equals(Object fil) {
if (!(fil instanceof FilterOperator)) {
return false;
}
// if child is leaf, will execute BasicFunctionOperator.equals()
FilterOperator operator = (FilterOperator) fil;
return compareTo(operator) == 0;
return this.tokenIntType == operator.tokenIntType
&& this.getChildren().equals(operator.getChildren());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public class QueryPlan extends PhysicalPlan {
private int rowOffset = 0;

private boolean isGroupByDevice = false; // for group by device sql
private List<String> measurementColumnList; // for group by device sql
private Map<String, Set<String>> measurementColumnsGroupByDevice; // for group by device sql
private Map<String, TSDataType> dataTypeConsistencyChecker; // for group by device sql
private List<String> measurements; // for group by device sql, e.g. temperature
private Map<String, Set<String>> measurementsGroupByDevice; // for group by device sql, e.g. root.ln.d1 -> temperature
private Map<String, TSDataType> dataTypeConsistencyChecker; // for group by device sql, e.g. root.ln.d1.temperature -> Float
private Map<Path, TSDataType> dataTypeMapping = new HashMap<>(); // for group by device sql

public QueryPlan() {
Expand Down Expand Up @@ -140,21 +140,21 @@ public void setGroupByDevice(boolean groupByDevice) {
isGroupByDevice = groupByDevice;
}

public void setMeasurementColumnList(List<String> measurementColumnList) {
this.measurementColumnList = measurementColumnList;
public void setMeasurements(List<String> measurements) {
this.measurements = measurements;
}

public List<String> getMeasurementColumnList() {
return measurementColumnList;
public List<String> getMeasurements() {
return measurements;
}

public void setMeasurementColumnsGroupByDevice(
Map<String, Set<String>> measurementColumnsGroupByDevice) {
this.measurementColumnsGroupByDevice = measurementColumnsGroupByDevice;
public void setMeasurementsGroupByDevice(
Map<String, Set<String>> measurementsGroupByDevice) {
this.measurementsGroupByDevice = measurementsGroupByDevice;
}

public Map<String, Set<String>> getMeasurementColumnsGroupByDevice() {
return measurementColumnsGroupByDevice;
public Map<String, Set<String>> getMeasurementsGroupByDevice() {
return measurementsGroupByDevice;
}

public void setDataTypeConsistencyChecker(
Expand Down

0 comments on commit 64d5ed1

Please sign in to comment.