Skip to content

Commit

Permalink
Merge 596e1c1 into 2f61528
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Mar 13, 2020
2 parents 2f61528 + 596e1c1 commit da6c5de
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 271 deletions.
23 changes: 15 additions & 8 deletions server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Expand Up @@ -529,15 +529,22 @@ private String getSgByEngineFile(File file) {

/**
*
* @return TsFiles (seq or unseq) grouped by their storage group.
* @return TsFiles (seq or unseq) grouped by their storage group and partition number.
*/
public Map<String, List<TsFileResource>> getAllClosedStorageGroupTsFile() {
Map<String, List<TsFileResource>> ret = new HashMap<>();
for (Entry<String, StorageGroupProcessor> entry : processorMap
.entrySet()) {
ret.computeIfAbsent(entry.getKey(), sg -> new ArrayList<>()).addAll(entry.getValue().getSequenceFileTreeSet());
ret.get(entry.getKey()).addAll(entry.getValue().getUnSequenceFileList());
ret.get(entry.getKey()).removeIf(file -> !file.isClosed());
public Map<String, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<String, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
for (Entry<String, StorageGroupProcessor> entry : processorMap.entrySet()) {
List<TsFileResource> sequenceFiles = entry.getValue().getSequenceFileTreeSet();
for (TsFileResource sequenceFile : sequenceFiles) {
if (!sequenceFile.isClosed()) {
continue;
}
String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
,n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
}
}
return ret;
}
Expand Down
Expand Up @@ -885,7 +885,7 @@ public void collectSeries(String startingPath, List<MeasurementSchema> timeserie
* and the wildcard will be removed.
* If the wildcard is at the tail, then the inference will go on until the storage groups are found
* and the wildcard will be kept.
* (2) Suppose the part of the path is a substring that begin after the storage group name. (e.g.,
* (2) Suppose the path of the path is a substring that begin after the storage group name. (e.g.,
* For "root.*.sg1.a.*.b.*" and "root.x.sg1" is a storage group, then this part is "a.*.b.*").
* For this part, keep what it is.
*
Expand Down
Expand Up @@ -24,6 +24,10 @@

public class AggregationPlan extends RawDataQueryPlan {

// e.g., for select count(s1), count(s1), count(s2), count(s2), sum (s1)
// aggregations are count, count, count, count, sum
// deduplicatedAggregations are count, count, sum

private List<String> aggregations = new ArrayList<>();
private List<String> deduplicatedAggregations = new ArrayList<>();

Expand Down
Expand Up @@ -299,7 +299,6 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
try {
// remove stars in SELECT to get actual paths
List<String> actualPaths = getMatchedTimeseries(fullPath.getFullPath());

// for actual non exist path
if (actualPaths.isEmpty() && originAggregations.isEmpty()) {
String nonExistMeasurement = fullPath.getMeasurement();
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
Expand All @@ -44,7 +45,7 @@ public abstract class AggregateResult {
private double doubleValue;
private Binary binaryValue;

private boolean hasResult;
protected boolean hasResult;

/**
* construct.
Expand Down Expand Up @@ -110,29 +111,32 @@ public static AggregateResult deserializeFrom(ByteBuffer buffer) {
TSDataType dataType = TSDataType.deserialize(buffer.getShort());
AggregateResult aggregateResult = AggregateResultFactory
.getAggrResultByType(aggregationType, dataType);
switch (dataType) {
case BOOLEAN:
aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
break;
case INT32:
aggregateResult.setIntValue(buffer.getInt());
break;
case INT64:
aggregateResult.setLongValue(buffer.getLong());
break;
case FLOAT:
aggregateResult.setFloatValue(buffer.getFloat());
break;
case DOUBLE:
aggregateResult.setDoubleValue(buffer.getDouble());
break;
case TEXT:
aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name());
boolean hasResult = ReadWriteIOUtils.readBool(buffer);
if (hasResult) {
switch (dataType) {
case BOOLEAN:
aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
break;
case INT32:
aggregateResult.setIntValue(buffer.getInt());
break;
case INT64:
aggregateResult.setLongValue(buffer.getLong());
break;
case FLOAT:
aggregateResult.setFloatValue(buffer.getFloat());
break;
case DOUBLE:
aggregateResult.setDoubleValue(buffer.getDouble());
break;
case TEXT:
aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name());
}
aggregateResult.deserializeSpecificFields(buffer);
}
aggregateResult.deserializeSpecificFields(buffer);
return aggregateResult;
}

Expand All @@ -141,29 +145,32 @@ public static AggregateResult deserializeFrom(ByteBuffer buffer) {
public void serializeTo(OutputStream outputStream) throws IOException {
aggregationType.serializeTo(outputStream);
ReadWriteIOUtils.write(resultDataType, outputStream);
switch (resultDataType) {
case BOOLEAN:
ReadWriteIOUtils.write(booleanValue, outputStream);
break;
case INT32:
ReadWriteIOUtils.write(intValue, outputStream);
break;
case INT64:
ReadWriteIOUtils.write(longValue, outputStream);
break;
case FLOAT:
ReadWriteIOUtils.write(floatValue, outputStream);
break;
case DOUBLE:
ReadWriteIOUtils.write(doubleValue, outputStream);
break;
case TEXT:
ReadWriteIOUtils.write(binaryValue, outputStream);
break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name());
ReadWriteIOUtils.write(hasResult(), outputStream);
if (hasResult()) {
switch (resultDataType) {
case BOOLEAN:
ReadWriteIOUtils.write(booleanValue, outputStream);
break;
case INT32:
ReadWriteIOUtils.write(intValue, outputStream);
break;
case INT64:
ReadWriteIOUtils.write(longValue, outputStream);
break;
case FLOAT:
ReadWriteIOUtils.write(floatValue, outputStream);
break;
case DOUBLE:
ReadWriteIOUtils.write(doubleValue, outputStream);
break;
case TEXT:
ReadWriteIOUtils.write(binaryValue, outputStream);
break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name());
}
serializeSpecificFields(outputStream);
}
serializeSpecificFields(outputStream);
}

protected abstract void serializeSpecificFields(OutputStream outputStream) throws IOException;
Expand Down Expand Up @@ -294,4 +301,8 @@ protected boolean hasResult() {
public String toString() {
return String.valueOf(getResult());
}

public AggregationType getAggregationType() {
return aggregationType;
}
}
Expand Up @@ -44,6 +44,11 @@ public AvgAggrResult(TSDataType seriesDataType) {
cnt = 0;
}

@Override
protected boolean hasResult() {
return cnt > 0;
}

@Override
public Double getResult() {
if (cnt > 0) {
Expand Down Expand Up @@ -120,6 +125,10 @@ public boolean isCalculatedAggregationResult() {
@Override
public void merge(AggregateResult another) {
AvgAggrResult anotherAvg = (AvgAggrResult) another;
if (anotherAvg.cnt == 0) {
// avoid two empty results producing an NaN
return;
}
avg = avg * ((double) cnt / (cnt + anotherAvg.cnt)) +
anotherAvg.avg * ((double) anotherAvg.cnt / (cnt + anotherAvg.cnt));
cnt += anotherAvg.cnt;
Expand Down
Expand Up @@ -41,6 +41,12 @@ public FirstValueAggrResult(TSDataType dataType) {
reset();
}

@Override
public void reset() {
super.reset();
timestamp = Long.MAX_VALUE;
}

@Override
public Object getResult() {
return hasResult() ? getValue() : null;
Expand Down
Expand Up @@ -40,6 +40,12 @@ public LastValueAggrResult(TSDataType dataType) {
reset();
}

@Override
public void reset() {
super.reset();
timestamp = Long.MIN_VALUE;
}

@Override
public Object getResult() {
return hasResult() ? getValue() : null;
Expand Down
Expand Up @@ -27,18 +27,21 @@
public abstract class GroupByEngineDataSet extends QueryDataSet {

protected long queryId;
private long interval;
private long slidingStep;
protected long interval;
protected long slidingStep;
// total query [startTime, endTime)
private long startTime;
private long endTime;
protected long startTime;
protected long endTime;

// current interval [curStartTime, curEndTime)
protected long curStartTime;
protected long curEndTime;
private int usedIndex;
protected int usedIndex;
protected boolean hasCachedTimeInterval;

public GroupByEngineDataSet() {
}

/**
* groupBy query.
*/
Expand Down
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.query.dataset.groupby;

import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.tsfile.utils.Pair;


/**
* Each executor calculates results of all aggregations on this series
*/
public interface GroupByExecutor {

/**
* add reusable result cache in executor
*/
void addAggregateResult(AggregateResult aggrResult);

/**
* calculate result in [curStartTime, curEndTime)
*/
List<AggregateResult> calcResult(long curStartTime, long curEndTime) throws IOException, QueryProcessException;
}
Expand Up @@ -29,11 +29,14 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;

public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
Expand All @@ -53,7 +56,10 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
/**
* group by batch calculation size.
*/
private int timeStampFetchSize;
protected int timeStampFetchSize;

public GroupByWithValueFilterDataSet() {
}

/**
* constructor.
Expand All @@ -74,18 +80,28 @@ public GroupByWithValueFilterDataSet(long queryId, GroupByPlan groupByPlan) {
/**
* init reader and aggregate function.
*/
private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
throws StorageEngineException {
this.timestampGenerator = new ServerTimeGenerator(groupByPlan.getExpression(), context);
this.timestampGenerator = getTimeGenerator(groupByPlan.getExpression(), context);
this.allDataReaderList = new ArrayList<>();
this.groupByPlan = groupByPlan;
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
allDataReaderList.add(new SeriesReaderByTimestamp(path, dataTypes.get(i), context,
QueryResourceManager.getInstance().getQueryDataSource(path, context, null), null));
allDataReaderList.add(getReaderByTime(path, dataTypes.get(i), context, null));
}
}

protected TimeGenerator getTimeGenerator(IExpression expression, QueryContext context)
throws StorageEngineException {
return new ServerTimeGenerator(expression, context);
}

protected IReaderByTimestamp getReaderByTime(Path path,
TSDataType dataType, QueryContext context, TsFileFilter fileFilter) throws StorageEngineException {
return new SeriesReaderByTimestamp(path, dataType, context,
QueryResourceManager.getInstance().getQueryDataSource(path, context, null), fileFilter);
}

@Override
protected RowRecord nextWithoutConstraint() throws IOException {
if (!hasCachedTimeInterval) {
Expand Down

0 comments on commit da6c5de

Please sign in to comment.