Skip to content

Commit

Permalink
add TsFileFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Feb 21, 2020
1 parent 59cba45 commit 9355a1f
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -80,3 +80,6 @@ tsfile/src/test/resources/*.ts
local-snapshots-dir/
venv/

partitions.tmp
partitions
node_identifier
Expand Up @@ -78,7 +78,7 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
TSDataType dataType = deduplicatedDataTypes.get(i);

ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
true);
true, null);
readersOfSelectedSeries.add(reader);
}

Expand All @@ -104,7 +104,7 @@ public QueryDataSet executeNonAlign(QueryContext context)
TSDataType dataType = deduplicatedDataTypes.get(i);

ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
true);
true, null);
readersOfSelectedSeries.add(reader);
}

Expand Down
Expand Up @@ -54,7 +54,8 @@ public abstract void constructReaders(Path path, QueryContext context)
void constructReaders(Path path, QueryContext context, long beforeRange)
throws IOException, StorageEngineException {
Filter timeFilter = constructFilter(beforeRange);
allDataReader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, true);
allDataReader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, true,
null);
}

public abstract IPointReader getFillResult() throws IOException, UnSupportedFillTypeException;
Expand Down
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.query.filter;

import org.apache.iotdb.db.engine.storagegroup.TsFileResource;

/**
* TsFileFilter is used to filter unwanted TsFiles in a QueryDataSource, to better support data
* partitioning in the distributed version and other features.
*/
@FunctionalInterface
public interface TsFileFilter {
boolean fileNotSatisfy(TsFileResource resource);
}
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
Expand Down Expand Up @@ -48,9 +49,10 @@ public class SeriesReaderWithValueFilter extends SeriesReaderWithoutValueFilter
private TimeValuePair timeValuePair;
private BatchData batchData;

public SeriesReaderWithValueFilter(Path seriesPath, TSDataType dataType, Filter filter, QueryContext context)
public SeriesReaderWithValueFilter(Path seriesPath, TSDataType dataType, Filter filter,
QueryContext context, TsFileFilter fileFilter)
throws StorageEngineException, IOException {
super(seriesPath, dataType, filter, context, false);
super(seriesPath, dataType, filter, context, false, fileFilter);
this.filter = filter;
}

Expand Down
Expand Up @@ -23,9 +23,11 @@
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
Expand Down Expand Up @@ -83,9 +85,11 @@ public class SeriesReaderWithoutValueFilter implements ManagedSeriesReader {
* to. We do not push down value filter to unsequence readers
*/
public SeriesReaderWithoutValueFilter(Path seriesPath, TSDataType dataType, Filter timeFilter,
QueryContext context, boolean pushdownUnseq) throws StorageEngineException, IOException {
QueryContext context, boolean pushdownUnseq, TsFileFilter fileFilter) throws StorageEngineException,
IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context);
QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
timeFilter = queryDataSource.updateTimeFilter(timeFilter);

// reader for sequence resources
Expand Down
Expand Up @@ -56,7 +56,8 @@ public Node construct(IExpression expression, QueryContext context)
Filter filter = ((SingleSeriesExpression) expression).getFilter();
Path path = ((SingleSeriesExpression) expression).getSeriesPath();
TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
return new EngineLeafNode(new SeriesReaderWithValueFilter(path, dataType, filter, context));
return new EngineLeafNode(new SeriesReaderWithValueFilter(path, dataType, filter, context
, null));
} catch (IOException | PathException e) {
throw new StorageEngineException(e.getMessage());
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
Expand Up @@ -22,6 +22,9 @@
import java.util.List;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;

public class QueryUtils {
Expand Down Expand Up @@ -73,4 +76,15 @@ private static boolean doModifyChunkMetaData(Modification modification, ChunkMet
}
return false;
}

// remove files that do not satisfy the filter
public static void filterQueryDataSource(QueryDataSource queryDataSource, TsFileFilter fileFilter) {
if (fileFilter == null) {
return;
}
List<TsFileResource> seqResources = queryDataSource.getSeqResources();
List<TsFileResource> unseqResources = queryDataSource.getUnseqResources();
seqResources.removeIf(fileFilter::fileNotSatisfy);
unseqResources.removeIf(fileFilter::fileNotSatisfy);
}
}

0 comments on commit 9355a1f

Please sign in to comment.