Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
Expand Down Expand Up @@ -67,6 +68,7 @@

import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
import static org.apache.iotdb.rpc.TSStatusCode.DATE_OUT_OF_RANGE;

Expand Down Expand Up @@ -450,24 +452,40 @@ public void releaseMemoryReservationManager() {

public void initQueryDataSource(List<IFullPath> sourcePaths) throws QueryProcessException {
long startTime = System.nanoTime();
if (sourcePaths == null) {
if (sourcePaths == null || sourcePaths.isEmpty()) {
this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE;
return;
}
dataRegion.readLock();
try {
List<IFullPath> pathList = new ArrayList<>();

IDeviceID singleDeviceId = null;
if (sourcePaths.size() == 1) {
singleDeviceId = sourcePaths.get(0).getDeviceId();
} else {
Set<IDeviceID> selectedDeviceIdSet = new HashSet<>();
for (IFullPath path : sourcePaths) {
pathList.add(path);
selectedDeviceIdSet.add(path.getDeviceId());
for (IFullPath sourcePath : sourcePaths) {
if (sourcePath instanceof AlignedFullPath) {
singleDeviceId = null;
break;
} else {
singleDeviceId = sourcePath.getDeviceId();
selectedDeviceIdSet.add(singleDeviceId);
if (selectedDeviceIdSet.size() > 1) {
singleDeviceId = null;
break;
}
}
}
}

dataRegion.readLock();
try {

this.sharedQueryDataSource =
dataRegion.query(
pathList,
sourcePaths,
// when all the selected series are under the same device, the QueryDataSource will be
// filtered according to timeIndex
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
singleDeviceId,
this,
// time filter may be stateful, so we need to copy it
globalTimeFilter != null ? globalTimeFilter.copy() : null,
Expand All @@ -479,7 +497,7 @@ public void initQueryDataSource(List<IFullPath> sourcePaths) throws QueryProcess
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
((QueryDataSource) sharedQueryDataSource).setSingleDevice(selectedDeviceIdSet.size() == 1);
((QueryDataSource) sharedQueryDataSource).setSingleDevice(singleDeviceId != null);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public TimeseriesMetadata get(
queryContext.getQueryStatistics().getLoadBloomFilterActualIOSize()::addAndGet;
boolean cacheHit = true;
try {
String deviceStringFormat = key.device.toString();
if (!CACHE_ENABLE) {
String deviceStringFormat = key.device.toString();
cacheHit = false;

// bloom filter part
Expand Down Expand Up @@ -158,6 +158,7 @@ public TimeseriesMetadata get(
DEBUG_LOGGER.info("Cache miss: {}.{} in file: {}", key.device, key.measurement, filePath);
DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors);
}
String deviceStringFormat = key.device.toString();
// allow for the parallelism of different devices
synchronized (
devices.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class VirtualDataRegion implements IDataRegionForQuery {

private static final String VIRTUAL_DB_NAME = "root.__virtual";

private static final QueryDataSource EMPTY_QUERY_DATA_SOURCE =
public static final QueryDataSource EMPTY_QUERY_DATA_SOURCE =
new QueryDataSource(Collections.emptyList(), Collections.emptyList());

private static final QueryDataSourceForRegionScan EMPTY_REGION_QUERY_DATA_SOURCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,23 +634,26 @@ public Optional<Long> getEndTime(IDeviceID deviceId) {

// cannot use FileTimeIndex
public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) {
if (timeIndex instanceof ArrayDeviceTimeIndex && !definitelyNotContains(deviceId)) {
// checked above
//noinspection OptionalGetWithoutIsPresent
return ascending ? getStartTime(deviceId).get() : getEndTime(deviceId).get();
if (timeIndex instanceof ArrayDeviceTimeIndex) {
return ascending
? timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE)
: timeIndex.getEndTime(deviceId).orElse(Long.MAX_VALUE);
} else {
return ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
}
}

// can use FileTimeIndex
public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) {
if (!definitelyNotContains(deviceId)) {
// checked above
//noinspection OptionalGetWithoutIsPresent
return ascending ? getStartTime(deviceId).get() : getEndTime(deviceId).get();
if (timeIndex instanceof ArrayDeviceTimeIndex) {
if (ascending) {
return timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE);
} else {
return timeIndex.getEndTime(deviceId).orElse(Long.MAX_VALUE);
}
} else {
return ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
// FileTimeIndex
return ascending ? getFileStartTime() : getFileEndTime();
}
}

Expand Down Expand Up @@ -1000,21 +1003,22 @@ public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq,
return false;
}

// check above
long startTime = getStartTime(deviceId).get();
long endTime = isClosed() || !isSeq ? getEndTime(deviceId).get() : Long.MAX_VALUE;
if (startTime > endTime) {
// startTime > endTime indicates that there is something wrong with this TsFile. Return false
// directly, or it may lead to infinite loop in GroupByMonthFilter#getTimePointPosition.
LOGGER.warn(
"startTime[{}] of TsFileResource[{}] is greater than its endTime[{}]",
startTime,
this,
endTime);
return false;
}

if (timeFilter != null) {
// check above
long startTime = getStartTime(deviceId).get();
long endTime = isClosed() || !isSeq ? getEndTime(deviceId).get() : Long.MAX_VALUE;
if (startTime > endTime) {
// startTime > endTime indicates that there is something wrong with this TsFile. Return
// false
// directly, or it may lead to infinite loop in GroupByMonthFilter#getTimePointPosition.
LOGGER.warn(
"startTime[{}] of TsFileResource[{}] is greater than its endTime[{}]",
startTime,
this,
endTime);
return false;
}

boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
if (debug && !res) {
DEBUG_LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,22 @@ public void putEndTime(IDeviceID deviceId, long time) {

@Override
public Optional<Long> getStartTime(IDeviceID deviceId) {
if (!deviceToIndex.containsKey(deviceId)) {
Integer index = deviceToIndex.get(deviceId);
if (index == null) {
return Optional.empty();
} else {
return Optional.of(startTimes[index]);
}
return Optional.of(startTimes[deviceToIndex.get(deviceId)]);
}

@Override
public Optional<Long> getEndTime(IDeviceID deviceId) {
if (!deviceToIndex.containsKey(deviceId)) {
Integer index = deviceToIndex.get(deviceId);
if (index == null) {
return Optional.empty();
} else {
return Optional.of(endTimes[index]);
}
return Optional.of(endTimes[deviceToIndex.get(deviceId)]);
}

@Override
Expand Down
Loading