Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public void testAlignedRawDataAlignByTime1() {
};
resultSetEqualTest(
"select s2 from root.sg1.d1 where s2 - 1 >= 9 and s2 < 30", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d1.s2,";
String[] retArray4 = new String[] {"14,14,", "15,15,"};
resultSetEqualTest(
"select s2 from root.sg1.d1 where s2 - 1 >= 9 and s2 < 30 offset 3 limit 2",
expectedHeader4,
retArray4);
}

@Test
Expand Down Expand Up @@ -164,6 +171,11 @@ public void testAlignedRawDataAlignByTime2() {
"30,30,",
};
resultSetEqualTest("select s3 from root.sg1.d1 where s3 + 1 > 16", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d1.s3,";
String[] retArray4 = new String[] {"3,30000,", "13,130000,", "16,16,"};
resultSetEqualTest(
"select s3 from root.sg1.d1 where s3 + 1 > 16 limit 3", expectedHeader4, retArray4);
}

@Test
Expand Down Expand Up @@ -203,6 +215,13 @@ public void testNonAlignedRawDataAlignByTime1() {
};
resultSetEqualTest(
"select s2 from root.sg1.d2 where s2 - 1 >= 9 and s2 < 30", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d2.s2,";
String[] retArray4 = new String[] {"12,12,", "13,13,", "14,14,"};
resultSetEqualTest(
"select s2 from root.sg1.d2 where s2 - 1 >= 9 and s2 < 30 limit 3 offset 2",
expectedHeader4,
retArray4);
}

@Test
Expand Down Expand Up @@ -257,6 +276,14 @@ public void testNonAlignedRawDataAlignByTime2() {
"25,25,", "26,26,", "27,27,", "28,28,", "29,29,", "30,30,",
};
resultSetEqualTest("select s3 from root.sg1.d2 where s3 + 1 > 16", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d2.s3,";
String[] retArray4 =
new String[] {
"26,26,", "27,27,", "28,28,", "29,29,", "30,30,",
};
resultSetEqualTest(
"select s3 from root.sg1.d2 where s3 + 1 > 16 offset 10", expectedHeader4, retArray4);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(context);
scanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());

Expression pushDownPredicate = node.getPushDownPredicate();
boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate);
Expand All @@ -357,6 +355,10 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c
context.getTypeProvider(),
context.getZoneId()));
}
if (pushDownPredicate == null || predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
}

OperatorContext operatorContext =
context
Expand All @@ -380,17 +382,43 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c

if (!predicateCanPushIntoScan) {
checkState(!context.isBuildPlanUseTemplate(), "Push down predicate is not supported yet");
return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
Collections.singletonList(ExpressionFactory.timeSeries(node.getSeriesPath()))
.toArray(new Expression[0]),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
Operator rootOperator =
constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
Collections.singletonList(ExpressionFactory.timeSeries(node.getSeriesPath()))
.toArray(new Expression[0]),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
if (node.getPushDownOffset() > 0) {
rootOperator =
new OffsetOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName()),
node.getPushDownOffset(),
rootOperator);
}
if (node.getPushDownLimit() > 0) {
rootOperator =
new LimitOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LimitOperator.class.getSimpleName()),
node.getPushDownLimit(),
rootOperator);
}
return rootOperator;
}
return seriesScanOperator;
}
Expand All @@ -401,8 +429,6 @@ public Operator visitAlignedSeriesScan(
AlignedPath seriesPath = node.getAlignedPath();

SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(context);
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
scanOptionsBuilder.withAllSensors(
new HashSet<>(
context.isBuildPlanUseTemplate()
Expand All @@ -420,6 +446,10 @@ public Operator visitAlignedSeriesScan(
context.getTypeProvider(),
context.getZoneId()));
}
if (pushDownPredicate == null || predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
}

OperatorContext operatorContext =
context
Expand Down Expand Up @@ -477,16 +507,43 @@ public Operator visitAlignedSeriesScan(
dataTypes.add(alignedPath.getSubMeasurementDataType(i));
}

return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
expressions.toArray(new Expression[0]),
dataTypes,
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
Operator rootOperator =
constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
expressions.toArray(new Expression[0]),
dataTypes,
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);

if (node.getPushDownOffset() > 0) {
rootOperator =
new OffsetOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName()),
node.getPushDownOffset(),
rootOperator);
}
if (node.getPushDownLimit() > 0) {
rootOperator =
new LimitOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LimitOperator.class.getSimpleName()),
node.getPushDownLimit(),
rootOperator);
}
return rootOperator;
}
return seriesScanOperator;
}
Expand Down