Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
public class LocalQueryExecutor {

private static final Logger logger = LoggerFactory.getLogger(LocalQueryExecutor.class);
public static final String DEBUG_SHOW_QUERY_ID = "{}: local queryId for {}#{} is {}";
private DataGroupMember dataGroupMember;
private ClusterReaderFactory readerFactory;
private String name;
Expand Down Expand Up @@ -173,7 +174,7 @@ public Map<String, ByteBuffer> fetchMultSeries(long readerId, List<String> paths
Map<String, ByteBuffer> pathByteBuffers = Maps.newHashMap();

for (String path : paths) {
ByteBuffer byteBuffer = null;
ByteBuffer byteBuffer;
if (reader.hasNextBatch(path)) {
BatchData batchData = reader.nextBatch(path);

Expand Down Expand Up @@ -237,7 +238,7 @@ public long querySingleSeries(SingleSeriesQueryRequest request)
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
DEBUG_SHOW_QUERY_ID,
name,
request.getQueryId(),
request.getPath(),
Expand Down Expand Up @@ -320,12 +321,7 @@ public long queryMultSeries(MultSeriesQueryRequest request)
});

List<TSDataType> dataTypes = Lists.newArrayList();
request
.getDataTypeOrdinal()
.forEach(
dataType -> {
dataTypes.add(TSDataType.values()[dataType]);
});
request.getDataTypeOrdinal().forEach(dataType -> dataTypes.add(TSDataType.values()[dataType]));

Filter timeFilter = null;
Filter valueFilter = null;
Expand All @@ -345,7 +341,7 @@ public long queryMultSeries(MultSeriesQueryRequest request)
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
DEBUG_SHOW_QUERY_ID,
name,
request.getQueryId(),
request.getPath(),
Expand Down Expand Up @@ -512,7 +508,7 @@ public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
DEBUG_SHOW_QUERY_ID,
name,
request.getQueryId(),
request.getPath(),
Expand Down Expand Up @@ -592,7 +588,7 @@ public List<ByteBuffer> getAggrResult(GetAggrResultRequest request)

List<String> aggregations = request.getAggregations();
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
PartialPath path = null;
PartialPath path;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,10 @@ public List<AbstractMultPointReader> getMultSeriesReader(
for (PartialPath partialPath : paths) {
List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
partitionGroups.forEach(
partitionGroup -> {
partitionGroupListMap
.computeIfAbsent(partitionGroup, n -> new ArrayList<>())
.add(partialPath);
});
partitionGroup ->
partitionGroupListMap
.computeIfAbsent(partitionGroup, n -> new ArrayList<>())
.add(partialPath));
}

List<AbstractMultPointReader> multPointReaders = Lists.newArrayList();
Expand Down Expand Up @@ -514,6 +513,7 @@ private SeriesReader getSeriesReader(
((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header);
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter);
return new SeriesReader(
path,
allSensors,
Expand Down Expand Up @@ -582,7 +582,7 @@ private AbstractMultPointReader getRemoteMultSeriesPointReader(
return new MultEmptyReader(fullPaths);
}
throw new StorageEngineException(
new RequestTimeOutException("Query " + paths + " in " + partitionGroup));
new RequestTimeOutException("Query multi-series: " + paths + " in " + partitionGroup));
}

/**
Expand Down Expand Up @@ -663,9 +663,9 @@ private MultSeriesQueryRequest constructMultQueryRequest(
if (path instanceof VectorPartialPath) {
StringBuilder builder = new StringBuilder(path.getFullPath());
List<PartialPath> pathList = ((VectorPartialPath) path).getSubSensorsPathList();
for (int i = 0; i < pathList.size(); i++) {
for (PartialPath partialPath : pathList) {
builder.append(":");
builder.append(pathList.get(i).getFullPath());
builder.append(partialPath.getFullPath());
}
fullPaths.add(builder.toString());
} else {
Expand All @@ -674,10 +674,7 @@ private MultSeriesQueryRequest constructMultQueryRequest(
});

List<Integer> dataTypeOrdinals = Lists.newArrayList();
dataTypes.forEach(
dataType -> {
dataTypeOrdinals.add(dataType.ordinal());
});
dataTypes.forEach(dataType -> dataTypeOrdinals.add(dataType.ordinal()));

request.setPath(fullPaths);
request.setHeader(partitionGroup.getHeader());
Expand Down Expand Up @@ -979,7 +976,7 @@ public IBatchReader getMultSeriesBatchReader(
QueryContext context,
DataGroupMember dataGroupMember,
boolean ascending)
throws StorageEngineException, QueryProcessException, IOException {
throws StorageEngineException, QueryProcessException {
// pull the newest data
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
Expand Down Expand Up @@ -79,13 +80,38 @@ public ClusterTimeGenerator(
}
}

@TestOnly
public ClusterTimeGenerator(
QueryContext context,
MetaGroupMember metaGroupMember,
ClusterReaderFactory clusterReaderFactory,
RawDataQueryPlan rawDataQueryPlan,
boolean onlyCheckLocalData)
throws StorageEngineException {
super(context);
this.queryPlan = rawDataQueryPlan;
this.readerFactory = clusterReaderFactory;
try {
readerFactory.syncMetaGroup();
if (onlyCheckLocalData) {
whetherHasLocalDataGroup(
queryPlan.getExpression(), metaGroupMember, queryPlan.isAscending());
} else {
constructNode(queryPlan.getExpression());
}
} catch (IOException | CheckConsistencyException e) {
throw new StorageEngineException(e);
}
}

@Override
protected IBatchReader generateNewBatchReader(SingleSeriesExpression expression)
throws IOException {
Filter filter = expression.getFilter();
Filter timeFilter = getTimeFilter(filter);
PartialPath path = (PartialPath) expression.getSeriesPath();
TSDataType dataType;
ManagedSeriesReader mergeReader = null;
ManagedSeriesReader mergeReader;
try {
dataType =
((CMManager) IoTDB.metaManager)
Expand All @@ -97,7 +123,7 @@ protected IBatchReader generateNewBatchReader(SingleSeriesExpression expression)
path,
queryPlan.getAllMeasurementsInDevice(path.getDevice()),
dataType,
null,
timeFilter,
filter,
context,
queryPlan.isAscending());
Expand All @@ -111,18 +137,6 @@ public boolean isHasLocalReader() {
return hasLocalReader;
}

public void setHasLocalReader(boolean hasLocalReader) {
this.hasLocalReader = hasLocalReader;
}

public QueryDataSet.EndPoint getEndPoint() {
return endPoint;
}

public void setEndPoint(QueryDataSet.EndPoint endPoint) {
this.endPoint = endPoint;
}

@Override
public String toString() {
return super.toString() + ", has local reader:" + hasLocalReader;
Expand Down Expand Up @@ -161,6 +175,7 @@ private Node constructNode(
private void checkHasLocalReader(
SingleSeriesExpression expression, MetaGroupMember metaGroupMember) throws IOException {
Filter filter = expression.getFilter();
Filter timeFilter = getTimeFilter(filter);
PartialPath path = (PartialPath) expression.getSeriesPath();
TSDataType dataType;
try {
Expand All @@ -184,7 +199,7 @@ private void checkHasLocalReader(
path,
queryPlan.getAllMeasurementsInDevice(path.getDevice()),
dataType,
null,
timeFilter,
filter,
context,
dataGroupMember,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
*/
public class BaseQueryTest extends BaseMember {

List<PartialPath> pathList;
List<TSDataType> dataTypes;
protected List<PartialPath> pathList;
protected List<TSDataType> dataTypes;

protected static void checkAggregations(
List<AggregateResult> aggregationResults, Object[] answer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void tearDown() throws Exception {
}

@Test
public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException {
public void testNoFilter() throws IOException, StorageEngineException {
RawDataQueryPlan plan = new RawDataQueryPlan();
plan.setDeduplicatedPathsAndUpdate(pathList);
plan.setDeduplicatedDataTypes(dataTypes);
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testFilter()
}

@Test
public void testNoFilterWithRedirect() throws StorageEngineException, QueryProcessException {
public void testNoFilterWithRedirect() throws StorageEngineException {
RawDataQueryPlan plan = new RawDataQueryPlan();
plan.setDeduplicatedPathsAndUpdate(pathList);
plan.setDeduplicatedDataTypes(dataTypes);
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testFilterWithValueFilterRedirect()

@Test
public void testFilterWithTimeFilterRedirect()
throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
throws StorageEngineException, QueryProcessException {
IExpression expression =
new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10)));
RawDataQueryPlan plan = new RawDataQueryPlan();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.query.reader;

import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.query.BaseQueryTest;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;

import org.junit.Test;

import java.io.IOException;
import java.util.HashSet;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public class ClusterReaderFactoryTest extends BaseQueryTest {

@Test
public void testTTL()
throws StorageEngineException, MetadataException, QueryProcessException, IOException {

ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
RemoteQueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));

try {
SeriesRawDataBatchReader seriesReader =
(SeriesRawDataBatchReader)
readerFactory.getSeriesBatchReader(
pathList.get(0),
new HashSet<>(),
dataTypes.get(0),
null,
null,
context,
dataGroupMemberMap.get(TestUtils.getNode(10)),
true);
assertNotNull(seriesReader);
StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), 100);
seriesReader =
(SeriesRawDataBatchReader)
readerFactory.getSeriesBatchReader(
pathList.get(0),
new HashSet<>(),
dataTypes.get(0),
null,
null,
context,
dataGroupMemberMap.get(TestUtils.getNode(10)),
true);
assertNull(seriesReader);
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), Long.MAX_VALUE);
}
}
}
Loading