diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java index dcd00c84ef036..1392f128951fa 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java @@ -92,6 +92,7 @@ public class LocalQueryExecutor { private static final Logger logger = LoggerFactory.getLogger(LocalQueryExecutor.class); + public static final String DEBUG_SHOW_QUERY_ID = "{}: local queryId for {}#{} is {}"; private DataGroupMember dataGroupMember; private ClusterReaderFactory readerFactory; private String name; @@ -173,7 +174,7 @@ public Map fetchMultSeries(long readerId, List paths Map pathByteBuffers = Maps.newHashMap(); for (String path : paths) { - ByteBuffer byteBuffer = null; + ByteBuffer byteBuffer; if (reader.hasNextBatch(path)) { BatchData batchData = reader.nextBatch(path); @@ -237,7 +238,7 @@ public long querySingleSeries(SingleSeriesQueryRequest request) request.getFetchSize(), request.getDeduplicatedPathNum()); logger.debug( - "{}: local queryId for {}#{} is {}", + DEBUG_SHOW_QUERY_ID, name, request.getQueryId(), request.getPath(), @@ -320,12 +321,7 @@ public long queryMultSeries(MultSeriesQueryRequest request) }); List dataTypes = Lists.newArrayList(); - request - .getDataTypeOrdinal() - .forEach( - dataType -> { - dataTypes.add(TSDataType.values()[dataType]); - }); + request.getDataTypeOrdinal().forEach(dataType -> dataTypes.add(TSDataType.values()[dataType])); Filter timeFilter = null; Filter valueFilter = null; @@ -345,7 +341,7 @@ public long queryMultSeries(MultSeriesQueryRequest request) request.getFetchSize(), request.getDeduplicatedPathNum()); logger.debug( - "{}: local queryId for {}#{} is {}", + DEBUG_SHOW_QUERY_ID, name, request.getQueryId(), request.getPath(), @@ -512,7 +508,7 @@ public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) request.getFetchSize(), request.getDeduplicatedPathNum()); logger.debug( - "{}: local queryId for {}#{} is {}", + DEBUG_SHOW_QUERY_ID, name, request.getQueryId(), request.getPath(), @@ -592,7 +588,7 @@ public List getAggrResult(GetAggrResultRequest request) List aggregations = request.getAggregations(); TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()]; - PartialPath path = null; + PartialPath path; try { path = new PartialPath(request.getPath()); } catch (IllegalPathException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index eb47005266138..0f83359d58e87 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -237,11 +237,10 @@ public List getMultSeriesReader( for (PartialPath partialPath : paths) { List partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath); partitionGroups.forEach( - partitionGroup -> { - partitionGroupListMap - .computeIfAbsent(partitionGroup, n -> new ArrayList<>()) - .add(partialPath); - }); + partitionGroup -> + partitionGroupListMap + .computeIfAbsent(partitionGroup, n -> new ArrayList<>()) + .add(partialPath)); } List multPointReaders = Lists.newArrayList(); @@ -514,6 +513,7 @@ private SeriesReader getSeriesReader( ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header); QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); + valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter); return new SeriesReader( path, allSensors, @@ -582,7 +582,7 @@ private AbstractMultPointReader getRemoteMultSeriesPointReader( return new MultEmptyReader(fullPaths); } throw new StorageEngineException( - new RequestTimeOutException("Query " + paths + " in " + partitionGroup)); + new RequestTimeOutException("Query multi-series: " + paths + " in " + partitionGroup)); } /** @@ -663,9 +663,9 @@ private MultSeriesQueryRequest constructMultQueryRequest( if (path instanceof VectorPartialPath) { StringBuilder builder = new StringBuilder(path.getFullPath()); List pathList = ((VectorPartialPath) path).getSubSensorsPathList(); - for (int i = 0; i < pathList.size(); i++) { + for (PartialPath partialPath : pathList) { builder.append(":"); - builder.append(pathList.get(i).getFullPath()); + builder.append(partialPath.getFullPath()); } fullPaths.add(builder.toString()); } else { @@ -674,10 +674,7 @@ private MultSeriesQueryRequest constructMultQueryRequest( }); List dataTypeOrdinals = Lists.newArrayList(); - dataTypes.forEach( - dataType -> { - dataTypeOrdinals.add(dataType.ordinal()); - }); + dataTypes.forEach(dataType -> dataTypeOrdinals.add(dataType.ordinal())); request.setPath(fullPaths); request.setHeader(partitionGroup.getHeader()); @@ -979,7 +976,7 @@ public IBatchReader getMultSeriesBatchReader( QueryContext context, DataGroupMember dataGroupMember, boolean ascending) - throws StorageEngineException, QueryProcessException, IOException { + throws StorageEngineException, QueryProcessException { // pull the newest data try { dataGroupMember.syncLeaderWithConsistencyCheck(false); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java index 43d266838e079..4b13e988f7a5a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.ExpressionType; @@ -79,13 +80,38 @@ public ClusterTimeGenerator( } } + @TestOnly + public ClusterTimeGenerator( + QueryContext context, + MetaGroupMember metaGroupMember, + ClusterReaderFactory clusterReaderFactory, + RawDataQueryPlan rawDataQueryPlan, + boolean onlyCheckLocalData) + throws StorageEngineException { + super(context); + this.queryPlan = rawDataQueryPlan; + this.readerFactory = clusterReaderFactory; + try { + readerFactory.syncMetaGroup(); + if (onlyCheckLocalData) { + whetherHasLocalDataGroup( + queryPlan.getExpression(), metaGroupMember, queryPlan.isAscending()); + } else { + constructNode(queryPlan.getExpression()); + } + } catch (IOException | CheckConsistencyException e) { + throw new StorageEngineException(e); + } + } + @Override protected IBatchReader generateNewBatchReader(SingleSeriesExpression expression) throws IOException { Filter filter = expression.getFilter(); + Filter timeFilter = getTimeFilter(filter); PartialPath path = (PartialPath) expression.getSeriesPath(); TSDataType dataType; - ManagedSeriesReader mergeReader = null; + ManagedSeriesReader mergeReader; try { dataType = ((CMManager) IoTDB.metaManager) @@ -97,7 +123,7 @@ protected IBatchReader generateNewBatchReader(SingleSeriesExpression expression) path, queryPlan.getAllMeasurementsInDevice(path.getDevice()), dataType, - null, + timeFilter, filter, context, queryPlan.isAscending()); @@ -111,18 +137,6 @@ public boolean isHasLocalReader() { return hasLocalReader; } - public void setHasLocalReader(boolean hasLocalReader) { - this.hasLocalReader = hasLocalReader; - } - - public QueryDataSet.EndPoint getEndPoint() { - return endPoint; - } - - public void setEndPoint(QueryDataSet.EndPoint endPoint) { - this.endPoint = endPoint; - } - @Override public String toString() { return super.toString() + ", has local reader:" + hasLocalReader; @@ -161,6 +175,7 @@ private Node constructNode( private void checkHasLocalReader( SingleSeriesExpression expression, MetaGroupMember metaGroupMember) throws IOException { Filter filter = expression.getFilter(); + Filter timeFilter = getTimeFilter(filter); PartialPath path = (PartialPath) expression.getSeriesPath(); TSDataType dataType; try { @@ -184,7 +199,7 @@ private void checkHasLocalReader( path, queryPlan.getAllMeasurementsInDevice(path.getDevice()), dataType, - null, + timeFilter, filter, context, dataGroupMember, diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java index eb048722307b0..c0ab4cc0ff370 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java @@ -50,8 +50,8 @@ */ public class BaseQueryTest extends BaseMember { - List pathList; - List dataTypes; + protected List pathList; + protected List dataTypes; protected static void checkAggregations( List aggregationResults, Object[] answer) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java index a303fe02ccb22..65148b0b83df0 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java @@ -60,7 +60,7 @@ public void tearDown() throws Exception { } @Test - public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException { + public void testNoFilter() throws IOException, StorageEngineException { RawDataQueryPlan plan = new RawDataQueryPlan(); plan.setDeduplicatedPathsAndUpdate(pathList); plan.setDeduplicatedDataTypes(dataTypes); @@ -97,7 +97,7 @@ public void testFilter() } @Test - public void testNoFilterWithRedirect() throws StorageEngineException, QueryProcessException { + public void testNoFilterWithRedirect() throws StorageEngineException { RawDataQueryPlan plan = new RawDataQueryPlan(); plan.setDeduplicatedPathsAndUpdate(pathList); plan.setDeduplicatedDataTypes(dataTypes); @@ -137,7 +137,7 @@ public void testFilterWithValueFilterRedirect() @Test public void testFilterWithTimeFilterRedirect() - throws IOException, StorageEngineException, QueryProcessException, IllegalPathException { + throws StorageEngineException, QueryProcessException { IExpression expression = new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10))); RawDataQueryPlan plan = new RawDataQueryPlan(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java new file mode 100644 index 0000000000000..21ec343295484 --- /dev/null +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.reader; + +import org.apache.iotdb.cluster.common.TestUtils; +import org.apache.iotdb.cluster.query.BaseQueryTest; +import org.apache.iotdb.cluster.query.RemoteQueryContext; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; + +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class ClusterReaderFactoryTest extends BaseQueryTest { + + @Test + public void testTTL() + throws StorageEngineException, MetadataException, QueryProcessException, IOException { + + ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember); + RemoteQueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + + try { + SeriesRawDataBatchReader seriesReader = + (SeriesRawDataBatchReader) + readerFactory.getSeriesBatchReader( + pathList.get(0), + new HashSet<>(), + dataTypes.get(0), + null, + null, + context, + dataGroupMemberMap.get(TestUtils.getNode(10)), + true); + assertNotNull(seriesReader); + StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), 100); + seriesReader = + (SeriesRawDataBatchReader) + readerFactory.getSeriesBatchReader( + pathList.get(0), + new HashSet<>(), + dataTypes.get(0), + null, + null, + context, + dataGroupMemberMap.get(TestUtils.getNode(10)), + true); + assertNull(seriesReader); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), Long.MAX_VALUE); + } + } +} diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java index a414fd970c50d..17261b26063e9 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query.reader; import org.apache.iotdb.cluster.common.TestUtils; +import org.apache.iotdb.cluster.exception.EmptyIntervalException; import org.apache.iotdb.cluster.query.BaseQueryTest; import org.apache.iotdb.cluster.query.RemoteQueryContext; import org.apache.iotdb.db.exception.StorageEngineException; @@ -29,14 +30,21 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; import org.junit.Test; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -72,4 +80,54 @@ public void test() QueryResourceManager.getInstance().endQuery(context.getQueryId()); } } + + @Test + public void testTimeFilter() + throws StorageEngineException, IOException, IllegalPathException, QueryProcessException { + RawDataQueryPlan dataQueryPlan = new RawDataQueryPlan(); + QueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + Filter valueFilter = ValueFilter.gtEq(3.0); + Filter timeFilter = TimeFilter.ltEq(8); + try { + IExpression expression = + new SingleSeriesExpression( + new PartialPath(TestUtils.getTestSeries(0, 0)), + new AndFilter(valueFilter, timeFilter)); + dataQueryPlan.setExpression(expression); + dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(0, 0))); + + // capture the time filter used to create a reader + AtomicReference timeFilterRef = new AtomicReference<>(null); + ClusterReaderFactory clusterReaderFactory = + new ClusterReaderFactory(testMetaMember) { + @Override + public ManagedSeriesReader getSeriesReader( + PartialPath path, + Set deviceMeasurements, + TSDataType dataType, + Filter timeFilter, + Filter valueFilter, + QueryContext context, + boolean ascending) + throws StorageEngineException, EmptyIntervalException { + timeFilterRef.set(timeFilter); + return super.getSeriesReader( + path, deviceMeasurements, dataType, timeFilter, valueFilter, context, ascending); + } + }; + ClusterTimeGenerator timeGenerator = + new ClusterTimeGenerator( + context, testMetaMember, clusterReaderFactory, dataQueryPlan, false); + + for (int i = 3; i <= 8; i++) { + assertTrue(timeGenerator.hasNext()); + assertEquals(i, timeGenerator.next()); + } + assertFalse(timeGenerator.hasNext()); + assertEquals(timeFilter, timeFilterRef.get()); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + } + } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java index d2eeca437388e..ea8230313439f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java @@ -75,7 +75,7 @@ public class BaseMember { public static AtomicLong dummyResponse = new AtomicLong(Response.RESPONSE_AGREE); - Map dataGroupMemberMap; + protected Map dataGroupMemberMap; private Map metaGroupMemberMap; PartitionGroup allNodes; protected MetaGroupMember testMetaMember; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java index 8698955ddf457..6af9c4eb338fd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java @@ -191,4 +191,9 @@ private boolean readPageData() throws IOException { private boolean isEmpty(BatchData batchData) { return batchData == null || !batchData.hasCurrent(); } + + @TestOnly + public SeriesReader getSeriesReader() { + return seriesReader; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 8589b95527c58..7c1213fd52d02 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -1221,4 +1221,9 @@ public boolean getAscending() { public TimeOrderUtils getOrderUtils() { return orderUtils; } + + @TestOnly + public Filter getValueFilter() { + return valueFilter; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java index 0982f6d2d8efc..2b7db36027ce7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java @@ -27,6 +27,7 @@ import java.util.PriorityQueue; /** This class implements {@link IPointReader} for data sources with different priorities. */ +@SuppressWarnings("ConstantConditions") // heap is ensured by hasNext non-empty public class PriorityMergeReader implements IPointReader { // max time of all added readers in PriorityMergeReader diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java index 403b0e9b8f0cd..645eb6b05c8ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java @@ -130,7 +130,7 @@ protected IBatchReader generateNewBatchReader(SingleSeriesExpression expression) } /** extract time filter from a value filter */ - private Filter getTimeFilter(Filter filter) { + protected Filter getTimeFilter(Filter filter) { if (filter instanceof UnaryFilter && ((UnaryFilter) filter).getFilterType() == FilterType.TIME_FILTER) { return filter;