Skip to content

Commit

Permalink
Delete dataTypeMapping etc fields in QueryPlan (#934)
Browse files Browse the repository at this point in the history
* Remove the dataTypeMapping field in QueryPlan for saving memory
  • Loading branch information
Alima777 committed Mar 24, 2020
1 parent d384cb5 commit 94411e0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand All @@ -29,8 +28,8 @@
public class AlignByDevicePlan extends QueryPlan {

private List<String> measurements; // to record result measurement columns, e.g. temperature, status, speed
private Map<String, Set<String>> deviceToMeasurementsMap; // e.g. root.ln.d1 -> temperature
// to check data type consistency for the same name sensor of different devices
private List<String> devices;
private Map<String, TSDataType> measurementDataTypeMap;
private Map<String, IExpression> deviceToFilterMap;
// to record different kinds of measurement
Expand All @@ -52,13 +51,12 @@ public List<String> getMeasurements() {
return measurements;
}

public void setDeviceToMeasurementsMap(
Map<String, Set<String>> deviceToMeasurementsMap) {
this.deviceToMeasurementsMap = deviceToMeasurementsMap;
public void setDevices(List<String> devices) {
this.devices = devices;
}

public Map<String, Set<String>> getDeviceToMeasurementsMap() {
return deviceToMeasurementsMap;
public List<String> getDevices() {
return devices;
}

public void setMeasurementDataTypeMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand All @@ -30,7 +28,6 @@ public abstract class QueryPlan extends PhysicalPlan {

private List<Path> paths = null;
private List<TSDataType> dataTypes = null;
private Map<Path, TSDataType> dataTypeMapping = new HashMap<>();
private boolean alignByTime = true; // for disable align sql

private int rowLimit = 0;
Expand Down Expand Up @@ -90,12 +87,4 @@ public void setAlignByTime(boolean align) {
alignByTime = align;
}

public Map<Path, TSDataType> getDataTypeMapping() {
return dataTypeMapping;
}

public void addTypeMapping(Path path, TSDataType dataType) {
dataTypeMapping.put(path, dataType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -275,7 +274,6 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)

// to record result measurement columns
List<String> measurements = new ArrayList<>();
Map<String, Set<String>> deviceToMeasurementsMap = new LinkedHashMap<>();
// to check the same measurement of different devices having the same datatype
Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
Expand Down Expand Up @@ -340,11 +338,6 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
|| measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
}
// update deviceToMeasurementsMap
if (!deviceToMeasurementsMap.containsKey(device)) {
deviceToMeasurementsMap.put(device, new HashSet<>());
}
deviceToMeasurementsMap.get(device).add(measurementChecked);
// update paths
paths.add(path);
}
Expand Down Expand Up @@ -375,7 +368,7 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)

// assigns to alignByDevicePlan
alignByDevicePlan.setMeasurements(measurements);
alignByDevicePlan.setDeviceToMeasurementsMap(deviceToMeasurementsMap);
alignByDevicePlan.setDevices(devices);
alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
alignByDevicePlan.setPaths(paths);
Expand Down Expand Up @@ -411,11 +404,10 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
}
}
try {
generateDataTypes(queryPlan);
deduplicate(queryPlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
deduplicate(queryPlan);

queryPlan.setRowLimit(queryOperator.getRowLimit());
queryPlan.setRowOffset(queryOperator.getRowOffset());
Expand Down Expand Up @@ -490,35 +482,42 @@ private void concatFilterPath(String prefix, FilterOperator operator,
basicOperator.setSinglePath(concatPath);
}

private void generateDataTypes(QueryPlan queryPlan) throws MetadataException {
private void deduplicate(QueryPlan queryPlan) throws MetadataException {
// generate dataType first
List<Path> paths = queryPlan.getPaths();
List<TSDataType> dataTypes = getSeriesTypes(paths);
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
TSDataType dataType = dataTypes.get(i);
queryPlan.addTypeMapping(path, dataType);
}
queryPlan.setDataTypes(dataTypes);
}

private void deduplicate(QueryPlan queryPlan) {
// deduplicate from here
if (queryPlan instanceof AlignByDevicePlan) {
return;
}
if (queryPlan instanceof AggregationPlan) {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
deduplicateAggregation(aggregationPlan);
List<String> aggregations = aggregationPlan.getAggregations();

Set<String> columnSet = new HashSet<>();
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
String column = aggregations.get(i) + "(" + path.toString() + ")";
if (!columnSet.contains(column)) {
aggregationPlan.addDeduplicatedPaths(path);
TSDataType seriesType = dataTypes.get(i);
aggregationPlan.addDeduplicatedDataTypes(seriesType);
aggregationPlan.addDeduplicatedAggregations(aggregations.get(i));
columnSet.add(column);
}
}
return;
}
RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
List<Path> paths = queryPlan.getPaths();

Set<String> columnSet = new HashSet<>();
Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
for (Path path : paths) {
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
String column = path.toString();
if (!columnSet.contains(column)) {
TSDataType seriesType = dataTypeMapping.get(path);
TSDataType seriesType = dataTypes.get(i);
rawDataQueryPlan.addDeduplicatedPaths(path);
rawDataQueryPlan.addDeduplicatedDataTypes(seriesType);
columnSet.add(column);
Expand All @@ -543,26 +542,6 @@ private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit,
return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
}


private void deduplicateAggregation(AggregationPlan queryPlan) {
List<Path> paths = queryPlan.getPaths();
List<String> aggregations = queryPlan.getAggregations();

Set<String> columnSet = new HashSet<>();
Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
String column = aggregations.get(i) + "(" + path.toString() + ")";
if (!columnSet.contains(column)) {
queryPlan.addDeduplicatedPaths(path);
TSDataType seriesType = dataTypeMapping.get(path);
queryPlan.addDeduplicatedDataTypes(seriesType);
queryPlan.addDeduplicatedAggregations(aggregations.get(i));
columnSet.add(column);
}
}
}

protected List<String> getMatchedTimeseries(String path) throws MetadataException {
return MManager.getInstance().getAllTimeseriesName(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
Expand Down Expand Up @@ -56,32 +59,31 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private IExpression expression;

private List<String> measurements;
private Map<String, Set<String>> deviceToMeasurementsMap;
private List<String> devices;
private Map<String, IExpression> deviceToFilterMap;
private Map<String, MeasurementType> measurementTypeMap;

private Map<String, TSDataType> measurementDataTpeMap;

private GroupByPlan groupByPlan;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
private RawDataQueryPlan rawDataQueryPlan;

private boolean curDataSetInitialized;
private Iterator<String> deviceIterator;
private String currentDevice;
private QueryDataSet currentDataSet;
private Map<Path, TSDataType> tsDataTypeMap;
private Iterator<String> deviceIterator;
private List<String> executeColumns;

public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
IQueryRouter queryRouter) {
super(null, alignByDevicePlan.getDataTypes());

this.measurements = alignByDevicePlan.getMeasurements();
this.tsDataTypeMap = alignByDevicePlan.getDataTypeMapping();
this.devices = alignByDevicePlan.getDevices();
this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
this.queryRouter = queryRouter;
this.context = context;
this.deviceToMeasurementsMap = alignByDevicePlan.getDeviceToMeasurementsMap();
this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
this.measurementTypeMap = alignByDevicePlan.getMeasurementTypeMap();

Expand All @@ -104,7 +106,7 @@ public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext co
}

this.curDataSetInitialized = false;
this.deviceIterator = deviceToMeasurementsMap.keySet().iterator();
this.deviceIterator = devices.iterator();
}

protected boolean hasNextWithoutConstraint() throws IOException {
Expand All @@ -116,25 +118,32 @@ protected boolean hasNextWithoutConstraint() throws IOException {

while (deviceIterator.hasNext()) {
currentDevice = deviceIterator.next();
Set<String> measurementColumnsOfGivenDevice = deviceToMeasurementsMap
.get(currentDevice);
executeColumns = new ArrayList<>(measurementColumnsOfGivenDevice);

// extract paths and aggregations if exist from executeColumns
// get all measurements of current device
Set<String> measurementOfGivenDevice;
try {
MNode deviceNode = MManager.getInstance().getNodeByPath(currentDevice);
measurementOfGivenDevice = deviceNode.getChildren().keySet();
} catch (MetadataException e) {
throw new IOException("Cannot get node from " + currentDevice);
}
// extract paths and aggregations queried from all measurements
// executeColumns is for calculating rowRecord
executeColumns = new ArrayList<>();
List<Path> executePaths = new ArrayList<>();
List<TSDataType> tsDataTypes = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
for (String column : executeColumns) {
for (String column : measurementDataTpeMap.keySet()) {
String measurement = column;
if (dataSetType == DataSetType.GROUPBY || dataSetType == DataSetType.AGGREGATE) {
Path path = new Path(currentDevice,
column.substring(column.indexOf('(') + 1, column.indexOf(')')));
tsDataTypes.add(tsDataTypeMap.get(path));
executePaths.add(path);
executeAggregations.add(column.substring(0, column.indexOf('(')));
} else {
Path path = new Path(currentDevice, column);
tsDataTypes.add(tsDataTypeMap.get(path));
executePaths.add(path);
measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
if (measurementOfGivenDevice.contains(measurement)) {
executeAggregations.add(column.substring(0, column.indexOf('(')));
}
}
if (measurementOfGivenDevice.contains(measurement)) {
executeColumns.add(column);
executePaths.add(new Path(currentDevice, measurement));
tsDataTypes.add(measurementDataTpeMap.get(column));
}
}

Expand Down

0 comments on commit 94411e0

Please sign in to comment.