From f514abd99a99a21ee60279b799a95290036590a6 Mon Sep 17 00:00:00 2001 From: Jialin Qiao Date: Thu, 5 Mar 2020 19:40:50 +0800 Subject: [PATCH] [IOTDB-536] Process about nonExistMeasurement and ConstantMeasurement in AlignByDevice is complex and not intuitive (#874) --- .../5-DataQuery/7-AlignByDeviceQuery.md | 203 ++++++++++++++++++ .../iotdb/db/qp/constant/SQLConstant.java | 2 +- .../qp/physical/crud/AlignByDevicePlan.java | 103 +++------ .../db/qp/strategy/PhysicalGenerator.java | 41 ++-- .../query/dataset/AlignByDeviceDataSet.java | 98 +++------ .../iotdb/db/service/TSServiceImpl.java | 133 ++++-------- 6 files changed, 321 insertions(+), 259 deletions(-) create mode 100644 docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md diff --git a/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md b/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md new file mode 100644 index 000000000000..25042e25dc03 --- /dev/null +++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md @@ -0,0 +1,203 @@ + + +# 按设备对齐查询 + +AlignByDevicePlan 即按设备对齐查询对应的表结构为: + +| Time | Device | sensor1 | sensor2 | sensor3 | ... | +| ---- | ------ | ------- | ------- | ------- | --- | +| | | | | | | + +## 设计原理 + +按设备对齐查询其实现原理主要是计算出查询中每个设备对应的测点和过滤条件,然后将查询按设备分别进行,最后将结果集拼装并返回。 + +### AlignByDevicePlan 中重要字段含义 + +首先解释一下 AlignByDevicePlan 中一些重要字段的含义: +- `List measurements`:查询中出现的 measurement 列表。 +- `Map dataTypeMapping`: 该变量继承自基类 QueryPlan,其主要作用是在计算每个设备的执行路径时,提供此次查询的 paths 对应的数据类型。 +- `Map> deviceToMeasurementsMap`, `Map deviceToFilterMap`: 这两个字段分别用来存储设备对应的测点和过滤条件。 +- `Map measurementDataTypeMap`:AlignByDevicePlan 要求不同设备的同名 sensor 数据类型一致,该字段是一个 `measurementName -> dataType` 的 Map 结构,用来验证同名 sensor 的数据类型一致性。如 `root.sg.d1.s1` 和 `root.sg.d2.s1` 应该是同一数据类型。 +- `enum MeasurementType`:记录三种 measurement 类型。在任何设备中都不存在的 measurement 为 `NonExist` 类型;有单引号或双引号的 measurement 为 `Constant` 类型;存在的 measurement 为 `Exist` 类型。 +- `Map measurementTypeMap`: 该字段是一个 `measureName -> measurementType` 的 Map 结构,用来记录查询中所有 measurement 的类型。 +- groupByPlan, fillQueryPlan, aggregationPlan:为了避免冗余,这三个执行计划被设定为 RawDataQueryPlan 的子类,而在 AlignByDevicePlan 中被设置为变量。如果查询计划属于这三个计划中的一种,则该字段会被赋值并保存。 + +在进行具体实现过程的讲解前,先给出一个覆盖较为完整的例子,下面的解释过程中将结合该示例进行说明。 + +```sql +SELECT s1, "1", *, s2, s5 FROM root.sg.d1, root.sg.* WHERE time = 1 AND s1 < 25 ALIGN BY DEVICE +``` + +其中,系统中的时间序列为: + +- root.sg.d1.s1 +- root.sg.d1.s2 +- root.sg.d2.s1 + +存储组 `root.sg` 共包含两个设备 d1 和 d2,其中 d1 有两个传感器 s1 和 s2,d2 只有传感器 s1,相同传感器 s1 的数据类型相同。 + +下面将按具体过程进行分别解释: + +### 逻辑计划生成 + +- org.apache.iotdb.db.qp.Planner + +与原始数据查询不同,按设备对齐查询并不在此阶段进行 SELECT 语句和 WHERE 语句中后缀路径的拼接,而将在后续生成物理计划时,计算出每个设备对应的映射值和过滤条件。因此,按设备对齐在此阶段所做的工作只包括对 WHERE 语句中过滤条件的优化。 + +对过滤条件的优化主要包括三部分:去非、转化析取范式、合并同路径过滤条件。对应的优化器分别为:RemoveNotOptimizer, DnfFilterOptimizer, MergeSingleFilterOptimizer。该部分逻辑可参考:[Planner](/#/SystemDesign/progress/chap2/sec2). + +### 物理计划生成 + +- org.apache.iotdb.db.qp.strategy.PhysicalGenerator + +生成逻辑计划后,将调用 PhysicalGenerator 类中的 `transformToPhysicalPlan()` 方法将该逻辑计划转化为物理计划。对于按设备对齐查询,该方法的主要逻辑实现在 `transformQuery()` 方法中。 + +**该阶段所做的主要工作为生成查询对应的** `AlignByDevicePlan`,**填充其中的变量信息。** + +首先解释一下 `transformQuery()` 方法中一些重要字段的含义(与 AlignByDevicePlan 中重复的字段见上文): + +- prefixPaths, suffixPaths:前者为 FROM 子句中的前缀路径,示例中为 `[root.sg.d1, root.sg.*]`; 后者为 SELECT 子句中的后缀路径,示例中为 `[s1, "1", *, s2, s5]`. +- devices:对前缀路径去通配符和设备去重后得到的设备列表,示例中为 `[root.sg.d1, root.sg.d2]`。 +- measurementSetOfGivenSuffix:中间变量,记录某一 suffix 对应的 measurement,示例中,对于后缀 \*, `measurementSetOfGivenSuffix = {s1,s2}`,对于后缀 s1, `measurementSetOfGivenSuffix = {s1}`; + +接下来介绍 AlignByDevicePlan 的计算过程: + +1. 检查查询类型是否为 groupByPlan, fillQueryPlan, aggregationPlan 这三类查询中的一种,如果是则对相应的变量进行赋值,并更改 `AlignByDevicePlan` 的查询类型。 +2. 遍历 SELECT 后缀路径,对每一个后缀路径设置一个中间变量为 `measurementSetOfGivenSuffix`,用来记录该后缀路径对应的所有 measurement。如果后缀路径以单引号或双引号开头,则直接在 `measurements` 中增加该值,并记录其类型为 `Constant` 类型。 +3. 否则将设备列表与该后缀路径拼接,得到完整的路径,如果拼接后的路径不存在,需要进一步判断该 measurement 是否在其它设备中存在,如果都没有则暂时识别为 `NonExist`,如果后续出现设备存在该 measurement,则覆盖 `NonExist` 值为 `Exist`。 +4. 如果拼接后路径存在,则证明 measurement 是 `Exist` 类型,需要检验数据类型的一致性,不满足返回错误信息,满足则记录下该 Measurement,对 `measurementSetOfGivenSuffix`, `deviceToMeasurementsMap` 等进行更新。 +5. 在一层 suffix 循环结束后,将该层循环中出现的 `measurementSetOfGivenSuffix` 加入 `measurements` 中。在整个循环结束后,将循环中得到的变量信息赋值到 AlignByDevicePlan 中。此处得到的 measurements 列表是未经过去重的,在生成 `ColumnHeader` 时将进行去重。 +6. 最后调用 `concatFilterByDevice()` 方法计算 `deviceToFilterMap`,得到将每个设备分别拼接后对应的 Filter 信息。 + +```java +Map concatFilterByDevice(List devices, + FilterOperator operator) +输入:去重后的 devices 列表和未拼接的 FilterOperator +输入:经过拼接后的 deviceToFilterMap,记录了每个设备对应的 Filter 信息 +``` + +`concatFilterByDevice()` 方法的主要处理逻辑在 `concatFilterPath()` 中: + +`concatFilterPath()` 方法遍历未拼接的 FilterOperator 二叉树,判断节点是否为叶子节点,如果是,则取该叶子结点的路径,如果路径以 time 或 root 开头则不做处理,否则将设备名与节点路径进行拼接后返回;如果不是,则对该节点的所有子节点进行迭代处理。示例中,设备1过滤条件拼接后的结果为 `time = 1 AND root.sg.d1.s1 < 25`,设备2为 `time = 1 AND root.sg.d2.s1 < 25`。 + +下面用示例总结一下通过该阶段计算得到的变量信息: + +- measurement 列表 `measurements`:`[s1, "1", s1, s2, s2, s5]` +- measurement 类型 `measurementTypeMap`: + - `s1 -> Exist` + - `s2 -> Exist` + - `"1" -> Constant` + - `s5 -> NonExist` +- 每个设备的测点 `deviceToMeasurementsMap`: + - `root.sg.d1 -> s1, s2` + - `root.sg.d2 -> s1` +- 每个设备的过滤条件 `deviceToFilterMap`: + - `root.sg.d1 -> time = 1 AND root.sg.d1.s1 < 25` + - `root.sg.d2 -> time = 1 AND root.sg.d2.s1 < 25` + +### 构造表头 (ColumnHeader) + +- org.apache.iotdb.db.service.TSServiceImpl + +在生成物理计划后,则可以执行 TSServiceImpl 中的 executeQueryStatement() 方法生成结果集并返回,其中第一步是构造表头。 + +按设备对齐查询在调用 `TSServiceImpl.getQueryColumnHeaders()` 方法后,根据查询类型进入 `TSServiceImpl.getAlignByDeviceQueryHeaders()` 来构造表头。 + +`getAlignByDeviceQueryHeaders()` 方法声明如下: + +```java +private void getAlignByDeviceQueryHeaders( + AlignByDevicePlan plan, List respColumns, List columnTypes) +输入:当前执行的物理计划 AlignByDevicePlan 和需要输出的列名 respColumns 以及其对应的数据类型 columnTypes +输出:计算得到的列名 respColumns 和数据类型 columnTypes +``` + +其具体实现逻辑如下: + +1. 首先加入 `Device` 列,其数据类型为 `TEXT`; +2. 遍历未去重的 measurements 列表,判断当前遍历 measurement 的类型,如果是 `Exist` 类型则从 `measurementTypeMap` 中取得其类型;其余两种类型设其类型为 `TEXT`,然后将 measurement 及其类型加入表头数据结构中。 +3. 根据中间变量 `deduplicatedMeasurements` 对 measurements 进行去重。 + +最终得到的 Header 为: + +| Time | Device | s1 | 1 | s1 | s2 | s2 | s5 | +| ---- | ------ | --- | --- | --- | --- | --- | --- | +| | | | | | | | | + +去重后的 `measurements` 为 `[s1, "1", s2, s5]`。 + +### 结果集生成 + +生成 ColumnHeader 后,最后一步为生成结果集填充结果并返回。 + +#### 结果集创建 + +- org.apache.iotdb.db.service.TSServiceImpl + +该阶段需要调用 `TSServiceImpl.createQueryDataSet()` 创建一个新的结果集,这部分实现逻辑较为简单,对于 AlignByDeviceQuery 而言,只需要新建一个 `AlignByDeviceDataSet` 即可,在构造函数中将把 AlignByDevicePlan 中的参数赋值到新建的结果集中。 + +#### 结果集填充 + +- org.apache.iotdb.db.utils.QueryDataSetUtils + +接下来需要填充结果,AlignByDeviceQuery 将调用 `TSServiceImpl.fillRpcReturnData()` 方法,然后根据查询类型进入 `QueryDataSetUtils.convertQueryDataSetByFetchSize()` 方法. + +`convertQueryDataSetByFetchSize()` 方法中获取结果的重要方法为 QueryDataSet 的 `hasNext()` 方法。 + +`hasNext()` 方法的主要逻辑如下: + +1. 判断是否有规定行偏移量 `rowOffset`,如果有则跳过需要偏移的行数;如果结果总行数少于规定的偏移量,则返回 false。 +2. 判断是否有规定行数限制 `rowLimit`,如果有则比较当前输出行数,当前输出行数大于行数限制则返回 false。 +3. 进入 `AlignByDeviceDataSet.hasNextWithoutConstraint()` 方法 + +
+ +- org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet + +首先解释一下结果集中重要字段的含义: + +- `deviceIterator`:按设备对齐查询本质上是计算出每个设备对应的映射值和过滤条件,然后将查询按设备分别进行,该字段即为设备的迭代器,每次查询获取一个设备进行。 +- `currentDataSet`:该字段代表了本次对某设备查询所获得的结果集。 + +`hasNextWithoutConstraint()` 方法所做的工作主要是判断当前结果集是否有下一结果,没有则获取下一设备,计算该设备执行查询需要的路径、数据类型及过滤条件,然后按其查询类型执行具体的查询计划后获得结果集,直至没有设备可进行查询。 + +其具体实现逻辑如下: + +1. 首先判断当前结果集是否被初始化且有下一个结果,如果是则直接返回 true,即当前可以调用 `next()` 方法获取下一个 `RowRecord`;否则设置结果集未被初始化进入步骤2. +2. 迭代 `deviceIterator` 获取本次执行需要的设备,之后 `deviceToMeasurementsMap` 中取得该设备对应的测点,得到 `executeColumns`. +3. 拼接当前设备名与 measurements,计算当前设备的查询路径、数据类型及过滤条件,得到对应的字段分别为 `executePaths`, `tsDataTypes`, `expression`,如果是聚合查询,则还需要计算 `executeAggregations`。 +4. 判断当前子查询类型为 GroupByQuery, AggregationQuery, FillQuery 或 RawDataQuery 进行对应的查询并返回结果集,实现逻辑可参考[原始数据查询](/#/SystemDesign/progress/chap5/sec3),[聚合查询](/#/SystemDesign/progress/chap5/sec4),[降采样查询](/#/SystemDesign/progress/chap5/sec5)。 + +通过 `hasNextWithoutConstraint()` 方法初始化结果集并确保有下一结果后,则可调用 `QueryDataSet.next()` 方法获取下一个 `RowRecord`. + +`next()` 方法主要实现逻辑为 `AlignByDeviceDataSet.nextWithoutConstraint()` 方法。 + +`nextWithoutConstraint()` 方法所做的工作主要是**将单个设备查询所得到的按时间对齐的结果集形式变换为按设备对齐的结果集形式**,并返回变换后的 `RowRecord`。 + +其具体实现逻辑如下: + +1. 首先从结果集中取得下一个按时间对齐的 `originRowRecord`。 +2. 新建一个添加了时间戳的 `RowRecord`,向其中加入设备列,先根据 `executeColumns` 与得到的结果建立一个由 `measurementName -> Field` 的 Map 结构 `currentColumnMap`. +3. 之后只需要遍历去重后的 `measurements` 列表,判断其类型,如果类型为 `Exist` 则根据 measurementName 从 `currentColumnMap` 中取得其对应的结果,如果没有则设为 `null`;如果是 `NonExist`类型,则直接设为 `null`; 如果是 `Constant` 类型,则将 `measureName` 作为该列的值。 + +再根据变换后的 `RowRecord` 写入输出数据流后,即可将结果集返回。 diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index 19b5909340ec..3cffb2c3c62c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java @@ -36,7 +36,7 @@ private SQLConstant() { // forbidding instantiation } - public static final String GROUPBY_DEVICE_COLUMN_NAME = "Device"; + public static final String ALIGNBY_DEVICE_COLUMN_NAME = "Device"; public static final String RESERVED_TIME = "time"; public static final String IS_AGGREGATION = "IS_AGGREGATION"; public static final String NOW_FUNC = "now()"; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java index c5700b1c5cb9..5faf7b3d9759 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.qp.physical.crud; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,25 +28,18 @@ public class AlignByDevicePlan extends QueryPlan { - private List measurements; // e.g. temperature, status, speed + private List measurements; // to record result measurement columns, e.g. temperature, status, speed private Map> deviceToMeasurementsMap; // e.g. root.ln.d1 -> temperature // to check data type consistency for the same name sensor of different devices - private Map dataTypeConsistencyChecker; + private Map measurementDataTypeMap; private Map deviceToFilterMap; + // to record different kinds of measurement + private Map measurementTypeMap; private GroupByPlan groupByPlan; private FillQueryPlan fillQueryPlan; private AggregationPlan aggregationPlan; - // the measurements that do not exist in any device, - // data type is considered as Boolean. The value is considered as null - private List notExistMeasurements = new ArrayList<>(); - private List positionOfNotExistMeasurements = new ArrayList<>(); - // the measurements that have quotation mark. e.g. "abc", - // '11', the data type is considered as String and the value is considered is the same with measurement name - private List constMeasurements = new ArrayList<>(); - private List positionOfConstMeasurements = new ArrayList<>(); - public AlignByDevicePlan() { super(); } @@ -69,13 +61,13 @@ public Map> getDeviceToMeasurementsMap() { return deviceToMeasurementsMap; } - public void setDataTypeConsistencyChecker( - Map dataTypeConsistencyChecker) { - this.dataTypeConsistencyChecker = dataTypeConsistencyChecker; + public void setMeasurementDataTypeMap( + Map measurementDataTypeMap) { + this.measurementDataTypeMap = measurementDataTypeMap; } - public Map getDataTypeConsistencyChecker() { - return dataTypeConsistencyChecker; + public Map getMeasurementDataTypeMap() { + return measurementDataTypeMap; } public Map getDeviceToFilterMap() { @@ -86,6 +78,15 @@ public void setDeviceToFilterMap(Map deviceToFilterMap) { this.deviceToFilterMap = deviceToFilterMap; } + public Map getMeasurementTypeMap() { + return measurementTypeMap; + } + + public void setMeasurementTypeMap( + Map measurementTypeMap) { + this.measurementTypeMap = measurementTypeMap; + } + public GroupByPlan getGroupByPlan() { return groupByPlan; } @@ -113,64 +114,14 @@ public void setAggregationPlan(AggregationPlan aggregationPlan) { this.setOperatorType(Operator.OperatorType.AGGREGATION); } - //we use the following algorithm to reproduce the order of measurements that user writes. - //suppose user writes SELECT 'c1',a1,b1,b2,'c2',a2,a3,'c3',b3,a4,a5 FROM ... where for each a_i - // column there is at least one device having it, and for each b_i column there is no device - // having it, and 'c_i' is a const column. - // Then, measurements is {a1, a2, a3, a4, a5}; - // notExistMeasurements = {b1, b2, b3}, and positionOfNotExistMeasurements is {2, 3, 8}; - // constMeasurements is {'c1', 'c2', 'c3'}, and positionOfConstMeasurements is {0, 4, 7}. - // When to reproduce the order of measurements. The pseudocode is: - //
-  // current = 0;
-  // if (min(notExist, const) <= current) {
-  //  pull min_element(notExist, const);
-  // } else {
-  //  pull from measurements;
-  // }
-  // current ++;
-  //
- - public void addNotExistMeasurement(int position, String measurement) { - notExistMeasurements.add(measurement); - positionOfNotExistMeasurements.add(position); - } - - public void addConstMeasurement(int position, String measurement) { - constMeasurements.add(measurement); - positionOfConstMeasurements.add(position); - } - - public List getNotExistMeasurements() { - return notExistMeasurements; - } - - public void setNotExistMeasurements(List notExistMeasurements) { - this.notExistMeasurements = notExistMeasurements; - } - - public List getPositionOfNotExistMeasurements() { - return positionOfNotExistMeasurements; - } - - public void setPositionOfNotExistMeasurements( - List positionOfNotExistMeasurements) { - this.positionOfNotExistMeasurements = positionOfNotExistMeasurements; - } - - public List getConstMeasurements() { - return constMeasurements; - } - - public void setConstMeasurements(List constMeasurements) { - this.constMeasurements = constMeasurements; - } - - public List getPositionOfConstMeasurements() { - return positionOfConstMeasurements; - } - - public void setPositionOfConstMeasurements(List positionOfConstMeasurements) { - this.positionOfConstMeasurements = positionOfConstMeasurements; + /** + * Exist: the measurements which don't belong to NonExist and Constant. + * NonExist: the measurements that do not exist in any device, data type is considered as String. + * The value is considered as null. + * Constant: the measurements that have quotation mark. e.g. "abc",'11'. + * The data type is considered as String and the value is the measurement name. + */ + public enum MeasurementType { + Exist, NonExist, Constant; } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 1f2413e59358..56f9b8021e1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -59,13 +59,14 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CountPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; @@ -267,25 +268,24 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator) List suffixPaths = queryOperator.getSelectOperator().getSuffixPaths(); List originAggregations = queryOperator.getSelectOperator().getAggregations(); + // to record result measurement columns List measurements = new ArrayList<>(); Map> deviceToMeasurementsMap = new LinkedHashMap<>(); // to check the same measurement of different devices having the same datatype - Map dataTypeConsistencyChecker = new HashMap<>(); + Map measurementDataTypeMap = new HashMap<>(); + Map measurementTypeMap = new HashMap<>(); List paths = new ArrayList<>(); - // current location for measurements in SELECT - int loc = 0; - for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT Path suffixPath = suffixPaths.get(i); - Set nonExistMeasurement = new HashSet<>(); // to record measurements in the loop of a suffix path Set measurementSetOfGivenSuffix = new LinkedHashSet<>(); // if const measurement if (suffixPath.startWith("'") || suffixPath.startWith("\"")) { - alignByDevicePlan.addConstMeasurement(loc++, suffixPath.getMeasurement()); + measurements.add(suffixPath.getMeasurement()); + measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant); continue; } @@ -297,7 +297,11 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator) // for actual non exist path if (actualPaths.isEmpty() && originAggregations.isEmpty()) { - nonExistMeasurement.add(fullPath.getMeasurement()); + String nonExistMeasurement = fullPath.getMeasurement(); + if (measurementSetOfGivenSuffix.add(nonExistMeasurement) + && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) { + measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist); + } } for (String pathStr : actualPaths) { @@ -316,20 +320,21 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator) measurementChecked = path.getMeasurement(); } TSDataType dataType = getSeriesType(pathForDataType); - if (dataTypeConsistencyChecker.containsKey(measurementChecked)) { - if (!dataType.equals(dataTypeConsistencyChecker.get(measurementChecked))) { + if (measurementDataTypeMap.containsKey(measurementChecked)) { + if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) { throw new QueryProcessException( "The data types of the same measurement column should be the same across " + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the " + "SQL document."); } } else { - dataTypeConsistencyChecker.put(measurementChecked, dataType); + measurementDataTypeMap.put(measurementChecked, dataType); } - // update measurementSetOfGivenSuffix and measurement location - if (measurementSetOfGivenSuffix.add(measurementChecked)) { - loc++; + // update measurementSetOfGivenSuffix and Normal measurement + if (measurementSetOfGivenSuffix.add(measurementChecked) + || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) { + measurementTypeMap.put(measurementChecked, MeasurementType.Exist); } // update deviceToMeasurementsMap if (!deviceToMeasurementsMap.containsKey(device)) { @@ -347,11 +352,6 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator) } } - nonExistMeasurement.removeAll(measurementSetOfGivenSuffix); - // update notExistMeasurement - for (String notExistMeasurementString : nonExistMeasurement) { - alignByDevicePlan.addNotExistMeasurement(loc++, notExistMeasurementString); - } // update measurements // Note that in the loop of a suffix path, set is used. // And across the loops of suffix paths, list is used. @@ -372,7 +372,8 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator) // assigns to alignByDevicePlan alignByDevicePlan.setMeasurements(measurements); alignByDevicePlan.setDeviceToMeasurementsMap(deviceToMeasurementsMap); - alignByDevicePlan.setDataTypeConsistencyChecker(dataTypeConsistencyChecker); + alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap); + alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap); alignByDevicePlan.setPaths(paths); // get deviceToFilterMap diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java index 7974bfb13266..a2c389742f82 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -53,18 +55,11 @@ public class AlignByDeviceDataSet extends QueryDataSet { private QueryContext context; private IExpression expression; - private List deduplicatedMeasurementColumns; + private List measurements; private Map> deviceToMeasurementsMap; private Map deviceToFilterMap; + private Map measurementTypeMap; - // the measurements that do not exist in any device, - // data type is considered as Boolean. The value is considered as null - private List notExistMeasurements; - private List positionOfNotExistMeasurements; - // the measurements that have quotation mark. e.g. "abc", - // '11', the data type is considered as String and the value is considered is the same with measurement name - private List constMeasurements; - private List positionOfConstMeasurements; private GroupByPlan groupByPlan; private FillQueryPlan fillQueryPlan; @@ -75,24 +70,20 @@ public class AlignByDeviceDataSet extends QueryDataSet { private Iterator deviceIterator; private String currentDevice; private QueryDataSet currentDataSet; - private int[] currentColumnMapRelation; private Map tsDataTypeMap; + private List executeColumns; public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) { super(null, alignByDevicePlan.getDataTypes()); - // get deduplicated measurement columns (already deduplicated in TSServiceImpl.getAlignByDeviceQueryHeaders) - this.deduplicatedMeasurementColumns = alignByDevicePlan.getMeasurements(); + this.measurements = alignByDevicePlan.getMeasurements(); this.tsDataTypeMap = alignByDevicePlan.getDataTypeMapping(); this.queryRouter = queryRouter; this.context = context; this.deviceToMeasurementsMap = alignByDevicePlan.getDeviceToMeasurementsMap(); this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap(); - this.notExistMeasurements = alignByDevicePlan.getNotExistMeasurements(); - this.constMeasurements = alignByDevicePlan.getConstMeasurements(); - this.positionOfNotExistMeasurements = alignByDevicePlan.getPositionOfNotExistMeasurements(); - this.positionOfConstMeasurements = alignByDevicePlan.getPositionOfConstMeasurements(); + this.measurementTypeMap = alignByDevicePlan.getMeasurementTypeMap(); switch (alignByDevicePlan.getOperatorType()) { case GROUPBY: @@ -114,7 +105,6 @@ public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext co this.curDataSetInitialized = false; this.deviceIterator = deviceToMeasurementsMap.keySet().iterator(); - this.currentColumnMapRelation = new int[deduplicatedMeasurementColumns.size()]; } protected boolean hasNextWithoutConstraint() throws IOException { @@ -125,30 +115,11 @@ protected boolean hasNextWithoutConstraint() throws IOException { } while (deviceIterator.hasNext()) { - for (int i = 0; i < deduplicatedMeasurementColumns.size(); i++) { - currentColumnMapRelation[i] = -1; - } currentDevice = deviceIterator.next(); Set measurementColumnsOfGivenDevice = deviceToMeasurementsMap .get(currentDevice); + executeColumns = new ArrayList<>(measurementColumnsOfGivenDevice); - // get columns to execute for the current device and the column map relation - // e.g. root.sg.d0's measurementColumnsOfGivenDevice is {s2,s3}, and - // deduplicatedMeasurementColumns is {s1,s2,s3,s4,s5}, - // then the final executeColumns is [s2,s3], currentColumnMapRelation is [-1,0,1,-1,-1]. - List executeColumns = new ArrayList<>(); - int indexInExecuteColumns = -1; - for (String column : measurementColumnsOfGivenDevice) { - for (int i = 0; i < deduplicatedMeasurementColumns.size(); i++) { - String columnToExecute = deduplicatedMeasurementColumns.get(i); - if (columnToExecute.equals(column)) { - executeColumns.add(column); - indexInExecuteColumns++; - currentColumnMapRelation[i] = indexInExecuteColumns; - break; - } - } - } // extract paths and aggregations if exist from executeColumns List executePaths = new ArrayList<>(); List tsDataTypes = new ArrayList<>(); @@ -223,43 +194,32 @@ protected RowRecord nextWithoutConstraint() throws IOException { rowRecord.addField(deviceField); List measurementFields = originRowRecord.getFields(); - for (int mapPos : currentColumnMapRelation) { - if (mapPos == -1) { - rowRecord.addField(null); - } else { - rowRecord.addField(measurementFields.get(mapPos)); - } + Map currentColumnMap = new HashMap<>(); + for (int i = 0; i < measurementFields.size(); i++) { + currentColumnMap.put(executeColumns.get(i), measurementFields.get(i)); } - // build record with constant and non exist measurement - RowRecord outRecord = new RowRecord(originRowRecord.getTimestamp()); - int loc = 0; - int totalSize = notExistMeasurements.size() + constMeasurements.size() - + rowRecord.getFields().size(); - int notExistMeasurementsLoc = 0; - int constMeasurementsLoc = 0; - int resLoc = 0; - // don't forget device column, so loc - 1 is for looking up constant and non exist column - while (loc < totalSize) { - if (notExistMeasurementsLoc < notExistMeasurements.size() - && loc - 1 == positionOfNotExistMeasurements.get(notExistMeasurementsLoc)) { - outRecord.addField(null); - notExistMeasurementsLoc++; - } else if (constMeasurementsLoc < constMeasurements.size() - && loc - 1 == positionOfConstMeasurements.get(constMeasurementsLoc)) { - Field res = new Field(TSDataType.TEXT); - res.setBinaryV(Binary.valueOf(constMeasurements.get(constMeasurementsLoc))); - outRecord.addField(res); - constMeasurementsLoc++; - } else { - outRecord.addField(rowRecord.getFields().get(resLoc)); - resLoc++; + for (String measurement : measurements) { + switch (measurementTypeMap.get(measurement)) { + case Exist: + if (currentColumnMap.get(measurement) != null) { + rowRecord.addField(currentColumnMap.get(measurement)); + } else { + rowRecord.addField(new Field(null)); + } + break; + case NonExist: + rowRecord.addField(new Field(null)); + break; + case Constant: + Field res = new Field(TSDataType.TEXT); + res.setBinaryV(Binary.valueOf(measurement)); + rowRecord.addField(res); + break; } - - loc++; } - return outRecord; + return rowRecord; } private enum DataSetType { diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 071181b82e67..c7f892b55425 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -18,6 +18,9 @@ */ package org.apache.iotdb.db.service; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE; + import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -28,6 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +64,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -192,7 +197,8 @@ public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { //check the version compatibility boolean compatible = checkCompatibility(req.getClient_protocol()); if (!compatible) { - tsStatus = getStatus(TSStatusCode.INCOMPATIBLE_VERSION, "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); + tsStatus = getStatus(TSStatusCode.INCOMPATIBLE_VERSION, + "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2); resp.setSessionId(sessionId); @@ -208,7 +214,7 @@ public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR); } TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2); + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2); resp.setSessionId(sessionId); logger.info( "{}: Login status: {}. User : {}", @@ -320,7 +326,7 @@ static TSStatus getStatus(TSStatusCode statusType) { /** * convert from TSStatusCode to TSStatus, which has message appending with existed status message * - * @param statusType status type + * @param statusType status type * @param appendMessage appending message */ private TSStatus getStatus(TSStatusCode statusType, String appendMessage) { @@ -574,7 +580,7 @@ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) { /** * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some - * AuthorPlan + * AuthorPlan */ private TSExecuteStatementResp internalExecuteQueryStatement( long statementId, PhysicalPlan plan, int fetchSize, String username) { @@ -798,113 +804,54 @@ private void getWideQueryHeaders( private void getAlignByDeviceQueryHeaders( AlignByDevicePlan plan, List respColumns, List columnTypes) { - // set columns in TSExecuteStatementResp. Note this is without deduplication. - respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME); + // set columns in TSExecuteStatementResp. + respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME); // get column types and do deduplication columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result List deduplicatedColumnsType = new ArrayList<>(); deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result - List deduplicatedMeasurementColumns = new ArrayList<>(); - Set tmpColumnSet = new HashSet<>(); - Map checker = plan.getDataTypeConsistencyChecker(); - // build column header with constant and non exist column and deduplicate - int loc = 0; - // size of total column - int totalSize = plan.getNotExistMeasurements().size() + plan.getConstMeasurements().size() - + plan.getMeasurements().size(); - // not exist column loc - int notExistMeasurementsLoc = 0; - // constant column loc - int constMeasurementsLoc = 0; - // normal column loc - int resLoc = 0; - // after removing duplicate, we must shift column position - int shiftLoc = 0; - while (loc < totalSize) { - boolean isNonExist = false; - boolean isConstant = false; - TSDataType type; - String column; - // not exist - if (isOneMeasurementIn(loc, - notExistMeasurementsLoc, plan.getPositionOfNotExistMeasurements())) { - // for shifting - plan.getPositionOfNotExistMeasurements().set(notExistMeasurementsLoc, loc - shiftLoc); - - type = TSDataType.TEXT; - column = plan.getNotExistMeasurements().get(notExistMeasurementsLoc); - notExistMeasurementsLoc++; - isNonExist = true; - } - // constant - else if (isOneMeasurementIn(loc, - constMeasurementsLoc, plan.getPositionOfConstMeasurements())) { - // for shifting - plan.getPositionOfConstMeasurements().set(constMeasurementsLoc, loc - shiftLoc); - - type = TSDataType.TEXT; - column = plan.getConstMeasurements().get(constMeasurementsLoc); - constMeasurementsLoc++; - isConstant = true; - } - // normal series - else { - type = checker.get(plan.getMeasurements().get(resLoc)); - column = plan.getMeasurements().get(resLoc); - resLoc++; + Set deduplicatedMeasurements = new LinkedHashSet<>(); + Map checker = plan.getMeasurementDataTypeMap(); + + // build column header with constant and non exist column and deduplication + List measurements = plan.getMeasurements(); + Map measurementTypeMap = plan.getMeasurementTypeMap(); + for (String measurement : measurements) { + TSDataType type = null; + switch (measurementTypeMap.get(measurement)) { + case Exist: + type = checker.get(measurement); + break; + case NonExist: + case Constant: + type = TSDataType.TEXT; } - + respColumns.add(measurement); columnTypes.add(type.toString()); - respColumns.add(column); - // deduplicate part - if (!tmpColumnSet.contains(column)) { - // Note that this deduplication strategy is consistent with that of client - // IoTDBQueryResultSet. - tmpColumnSet.add(column); - if (!isNonExist && !isConstant) { - // only refer to those normal measurements - deduplicatedMeasurementColumns.add(column); - } + + if (!deduplicatedMeasurements.contains(measurement)) { + deduplicatedMeasurements.add(measurement); deduplicatedColumnsType.add(type); - } else if (isConstant) { - shiftLoc++; - constMeasurementsLoc--; - plan.getConstMeasurements().remove(constMeasurementsLoc); - plan.getPositionOfConstMeasurements().remove(constMeasurementsLoc); - } else if (isNonExist) { - shiftLoc++; - notExistMeasurementsLoc--; - plan.getNotExistMeasurements().remove(notExistMeasurementsLoc); - plan.getPositionOfNotExistMeasurements().remove(notExistMeasurementsLoc); - } else { - shiftLoc++; } - - loc++; } // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use. - // i.e., used by DeviceIterateDataSet constructor in `fetchResults` stage. - plan.setMeasurements(deduplicatedMeasurementColumns); + // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage. + plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements)); plan.setDataTypes(deduplicatedColumnsType); // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing. plan.setPaths(null); - plan.setDataTypeConsistencyChecker(null); } - /** - * - * @param subLoc - * @param totalLoc - * @param measurementPositions - * @return true if the measurement at totalLoc is the subLoc measurement in measurementPositions, - * false otherwise - */ - private boolean isOneMeasurementIn(int totalLoc, - int subLoc, List measurementPositions) { - return subLoc < measurementPositions.size() && totalLoc == measurementPositions.get(subLoc); + private void getLastQueryHeaders( + QueryPlan plan, List respColumns, List columnTypes) + throws TException, QueryProcessException { + respColumns.add(COLUMN_TIMESERIES); + respColumns.add(COLUMN_VALUE); + columnTypes.add(TSDataType.TEXT.toString()); + columnTypes.add(TSDataType.TEXT.toString()); } @Override