Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -821,13 +821,20 @@ public List<ByteBuffer> getGroupByResult(long executorId, long startTime, long e
return resultBuffers;
}

/**
* returns a non-nul ByteBuffer as thrift response, which not allows null objects. If the
* ByteBuffer data equals <0, null>, it means that the NextNotNullValue is null.
*/
public ByteBuffer peekNextNotNullValue(long executorId, long startTime, long endTime)
throws ReaderNotFoundException, IOException {
GroupByExecutor executor = queryManager.getGroupByExecutor(executorId);
if (executor == null) {
throw new ReaderNotFoundException(executorId);
}
Pair<Long, Object> pair = executor.peekNextNotNullValue(startTime, endTime);
if (pair == null) {
pair = new Pair<>(0L, null);
}
ByteBuffer resultBuffer;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndT
if (aggrBuffer != null) {
long time = aggrBuffer.getLong();
Object o = SerializeUtils.deserializeObject(aggrBuffer);
result = new Pair<>(time, o);
if (o != null) {
result = new Pair<>(time, o);
}
}
logger.debug(
"Fetched peekNextNotNullValue from {} of [{}, {}]: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember met
}

@Override
protected List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
public List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
List<PartialPath> seriesPaths,
List<TSDataType> dataTypes,
QueryContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
public class GroupByFillDataSet extends QueryDataSet {

private GroupByEngineDataSet groupByEngineDataSet;
private final LastQueryExecutor lastQueryExecutor;
private Map<TSDataType, IFill> fillTypes;
// the first value for each time series
private Object[] previousValue;
Expand All @@ -60,10 +61,12 @@ public GroupByFillDataSet(
GroupByEngineDataSet groupByEngineDataSet,
Map<TSDataType, IFill> fillTypes,
QueryContext context,
GroupByTimeFillPlan groupByFillPlan)
GroupByTimeFillPlan groupByFillPlan,
LastQueryExecutor lastQueryExecutor)
throws StorageEngineException, IOException, QueryProcessException {
super(new ArrayList<>(paths), dataTypes, groupByFillPlan.isAscending());
this.groupByEngineDataSet = groupByEngineDataSet;
this.lastQueryExecutor = lastQueryExecutor;
this.fillTypes = fillTypes;
List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(paths);
try {
Expand Down Expand Up @@ -125,8 +128,8 @@ private void initLastTimeArray(QueryContext context, GroupByTimeFillPlan groupBy
seriesPaths.add((PartialPath) paths.get(i));
}
List<Pair<Boolean, TimeValuePair>> lastValueContainer =
LastQueryExecutor.calculateLastPairForSeriesLocally(
seriesPaths, dataTypes, context, null, groupByFillPlan.getDeviceToMeasurements());
lastQueryExecutor.calculateLastPairForSeries(
seriesPaths, dataTypes, context, null, groupByFillPlan);
for (int i = 0; i < lastValueContainer.size(); i++) {
if (Boolean.TRUE.equals(lastValueContainer.get(i).left)) {
lastTimeArray[i] = lastValueContainer.get(i).right.getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public QueryDataSet execute(QueryContext context, LastQueryPlan lastQueryPlan)
return dataSet;
}

protected List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
public List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
List<PartialPath> seriesPaths,
List<TSDataType> dataTypes,
QueryContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,16 @@ public QueryDataSet groupByFill(GroupByTimeFillPlan groupByFillPlan, QueryContex
IOException {
GroupByEngineDataSet groupByEngineDataSet =
(GroupByEngineDataSet) groupBy(groupByFillPlan, context);
// here we pass an empty LastQueryPlan as we don't depend on it but only to generate a
// LastQueryExecutor
return new GroupByFillDataSet(
groupByFillPlan.getDeduplicatedPaths(),
groupByFillPlan.getDeduplicatedDataTypes(),
groupByEngineDataSet,
groupByFillPlan.getFillType(),
context,
groupByFillPlan);
groupByFillPlan,
getLastQueryExecutor(new LastQueryPlan()));
}

@Override
Expand Down