diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java index 21ff962ad6abc..78cda6b40e6b6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java @@ -577,6 +577,22 @@ public void templateInvalidTest() { "select count(s1+1) from root.sg1.** align by device;", expectedHeader, retArray); } + @Test + public void emptyResultTest() { + String[] expectedHeader = new String[] {"Time,Device,s3,s1,s2"}; + String[] retArray = new String[] {}; + resultSetEqualTest( + "SELECT * FROM root.sg1.** where time>=now()-1d and time<=now() " + + "ORDER BY TIME DESC ALIGN BY DEVICE;", + expectedHeader, + retArray); + resultSetEqualTest( + "SELECT * FROM root.sg2.** where time>=now()-1d and time<=now() " + + "ORDER BY TIME DESC ALIGN BY DEVICE;", + expectedHeader, + retArray); + } + private static void insertData() { try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java index d14741a31d658..d59067463f9d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java @@ -80,6 +80,8 @@ public class TopKOperator implements ProcessOperator { // the data of every childOperator is in order private final boolean childrenDataInOrder; + public static int operatorBatchUpperBound = 100000; + public TopKOperator( OperatorContext operatorContext, List deviceOperators, @@ -98,7 +100,10 @@ public TopKOperator( initResultTsBlock(); - deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 / topValue + 1; + deviceBatchStep = + operatorBatchUpperBound % topValue == 0 + ? operatorBatchUpperBound / topValue + : operatorBatchUpperBound / topValue + 1; canCallNext = new boolean[deviceOperators.size()]; } @@ -137,7 +142,14 @@ public boolean isFinished() throws Exception { @Override public boolean hasNext() throws Exception { - return !(deviceIndex >= deviceOperators.size() && resultReturnSize >= topKResult.length); + if (deviceIndex >= deviceOperators.size()) { + if (topKResult == null) { + return false; + } + + return resultReturnSize < topKResult.length; + } + return true; } @Override @@ -289,10 +301,13 @@ private TsBlock getResultFromCachedTopKResult() { } tsBlockBuilder.reset(); + + if (topKResult == null || topKResult.length == 0) { + return tsBlockBuilder.build(); + } + ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); - for (int i = resultReturnSize, size = (topKResult == null ? 0 : topKResult.length); - i < size; - i++) { + for (int i = resultReturnSize; i < topKResult.length; i++) { MergeSortKey mergeSortKey = topKResult[i]; TsBlock targetBlock = mergeSortKey.tsBlock; tsBlockBuilder diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index a66b1518c9292..5ec7997fdfcd7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -171,6 +171,7 @@ public static boolean canBuildPlanUseTemplate( // generate result set header according to output expressions analyzeOutput(analysis, queryStatement, outputExpressions); + context.generateGlobalTimeFilter(analysis); // fetch partition information analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter()); return true; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java index 2a48c94159fac..62ba3e689714d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java @@ -197,9 +197,7 @@ public TopKOperator topKOperatorTest( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1)); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator1 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath2 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32); @@ -211,9 +209,7 @@ public TopKOperator topKOperatorTest( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2)); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator2 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath3 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32); @@ -225,9 +221,7 @@ public TopKOperator topKOperatorTest( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3)); seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator3 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath4 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32); @@ -239,9 +233,7 @@ public TopKOperator topKOperatorTest( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4)); seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator4 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath5 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); @@ -253,9 +245,7 @@ public TopKOperator topKOperatorTest( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5)); seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator5 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); List tsDataTypes = new LinkedList<>( @@ -292,9 +282,7 @@ public TopKOperator topKOperatorTest( ? new AscTimeComparator() : new DescTimeComparator())), timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); - timeJoinOperator1 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); SingleDeviceViewOperator singleDeviceViewOperator2 = new SingleDeviceViewOperator( @@ -322,9 +310,7 @@ public TopKOperator topKOperatorTest( ? new AscTimeComparator() : new DescTimeComparator())), timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); - timeJoinOperator2 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); SingleDeviceViewOperator singleDeviceViewOperator3 = new SingleDeviceViewOperator( @@ -348,7 +334,7 @@ public TopKOperator topKOperatorTest( Arrays.asList(TSDataType.INT64, TSDataType.TEXT)), limitValue, true); - topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); return topKOperator; } catch (IllegalPathException e) { e.printStackTrace(); @@ -913,9 +899,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1)); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator1 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath2 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32); @@ -927,9 +911,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2)); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator2 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath3 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32); @@ -941,9 +923,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3)); seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator3 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath4 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32); @@ -955,9 +935,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4)); seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator4 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath5 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); @@ -969,9 +947,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5)); seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator5 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath6 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device3.sensor0", TSDataType.INT32); @@ -983,9 +959,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath6)); seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator6 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath7 = new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device3.sensor1", TSDataType.INT32); @@ -997,9 +971,7 @@ public TopKOperator topKOperatorTest3( timeOrdering, SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath7)); seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); - seriesScanOperator7 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); List tsDataTypes = new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32)); @@ -1021,9 +993,7 @@ public TopKOperator topKOperatorTest3( ? new AscTimeComparator() : new DescTimeComparator())), timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); - timeJoinOperator1 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); RowBasedTimeJoinOperator timeJoinOperator2 = new RowBasedTimeJoinOperator( driverContext.getOperatorContexts().get(8), @@ -1042,9 +1012,7 @@ public TopKOperator topKOperatorTest3( ? new AscTimeComparator() : new DescTimeComparator())), timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); - timeJoinOperator2 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); RowBasedTimeJoinOperator timeJoinOperator3 = new RowBasedTimeJoinOperator( driverContext.getOperatorContexts().get(9), @@ -1063,9 +1031,7 @@ public TopKOperator topKOperatorTest3( ? new AscTimeComparator() : new DescTimeComparator())), timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); - timeJoinOperator3 - .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); List devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1, DEVICE2, DEVICE3)); if (deviceOrdering == Ordering.DESC) { Collections.reverse(devices); @@ -1111,7 +1077,7 @@ public TopKOperator topKOperatorTest3( Arrays.asList(TSDataType.TEXT, TSDataType.INT64)), limitValue, false); - topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); return topKOperator; } catch (IllegalPathException e) { e.printStackTrace(); @@ -1343,7 +1309,7 @@ public void topKOperatorSortTest() throws Exception { Collections.singletonList(TSDataType.INT64)), 20, true); - topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); int index = 0; while (topKOperator.isBlocked().isDone() && topKOperator.hasNext()) { @@ -1358,4 +1324,227 @@ public void topKOperatorSortTest() throws Exception { } assertEquals(index, ans.length); } + + // ---------------------------------------------------------------------------------------------- + // order by time - all result of SeriesScan is empty + // ---------------------------------------------------------------------------------------------- + // TopKOperator + // ____________|_______________ + // / | \ + // SingleDeviceViewOperator SingleDeviceViewOperator SingleDeviceViewOperator + // / | \ + // SeriesScanOperator TimeJoinOperator TimeJoinOperator + // / \ / \ + // SeriesScanOperator SeriesScanOperator SeriesScanOperator SeriesScanOperator + // ---------------------------------------------------------------------------------------------- + public TopKOperator emptyTopKOperatorTest( + Ordering timeOrdering, Ordering deviceOrdering, int limitValue) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + // Construct operator tree + QueryId queryId = new QueryId("stub_query"); + + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId3 = new PlanNodeId("3"); + driverContext.addOperatorContext(3, planNodeId3, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId4 = new PlanNodeId("4"); + driverContext.addOperatorContext(4, planNodeId4, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId5 = new PlanNodeId("5"); + driverContext.addOperatorContext(5, planNodeId5, SeriesScanOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 6, new PlanNodeId("6"), SingleDeviceViewOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 7, new PlanNodeId("7"), RowBasedTimeJoinOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 8, new PlanNodeId("8"), SingleDeviceViewOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 9, new PlanNodeId("9"), RowBasedTimeJoinOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 10, new PlanNodeId("10"), SingleDeviceViewOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 11, new PlanNodeId("11"), TopKOperator.class.getSimpleName()); + + MeasurementPath measurementPath1 = + new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(0), + planNodeId1, + measurementPath1, + timeOrdering, + SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1)); + seriesScanOperator1.initQueryDataSource( + new QueryDataSource(Collections.emptyList(), Collections.emptyList())); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + MeasurementPath measurementPath2 = + new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(1), + planNodeId2, + measurementPath2, + timeOrdering, + SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2)); + seriesScanOperator2.initQueryDataSource( + new QueryDataSource(Collections.emptyList(), Collections.emptyList())); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + MeasurementPath measurementPath3 = + new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator3 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(2), + planNodeId3, + measurementPath3, + timeOrdering, + SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3)); + seriesScanOperator3.initQueryDataSource( + new QueryDataSource(Collections.emptyList(), Collections.emptyList())); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + MeasurementPath measurementPath4 = + new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32); + SeriesScanOperator seriesScanOperator4 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(3), + planNodeId4, + measurementPath4, + timeOrdering, + SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4)); + seriesScanOperator4.initQueryDataSource( + new QueryDataSource(Collections.emptyList(), Collections.emptyList())); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + MeasurementPath measurementPath5 = + new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator5 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(4), + planNodeId5, + measurementPath5, + timeOrdering, + SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5)); + seriesScanOperator5.initQueryDataSource( + new QueryDataSource(Collections.emptyList(), Collections.emptyList())); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + List tsDataTypes = + new LinkedList<>( + Arrays.asList( + TSDataType.TEXT, + TSDataType.INT32, + TSDataType.INT32, + TSDataType.INT32, + TSDataType.INT32, + TSDataType.INT32)); + SingleDeviceViewOperator singleDeviceViewOperator1 = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(5), + DEVICE0, + seriesScanOperator1, + Collections.singletonList(1), + tsDataTypes); + + RowBasedTimeJoinOperator timeJoinOperator1 = + new RowBasedTimeJoinOperator( + driverContext.getOperatorContexts().get(6), + Arrays.asList(seriesScanOperator2, seriesScanOperator3), + timeOrdering, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + Arrays.asList( + new SingleColumnMerger( + new InputLocation(0, 0), + timeOrdering == Ordering.ASC + ? new AscTimeComparator() + : new DescTimeComparator()), + new SingleColumnMerger( + new InputLocation(1, 0), + timeOrdering == Ordering.ASC + ? new AscTimeComparator() + : new DescTimeComparator())), + timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + SingleDeviceViewOperator singleDeviceViewOperator2 = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(7), + DEVICE1, + timeJoinOperator1, + Arrays.asList(2, 3), + tsDataTypes); + + RowBasedTimeJoinOperator timeJoinOperator2 = + new RowBasedTimeJoinOperator( + driverContext.getOperatorContexts().get(8), + Arrays.asList(seriesScanOperator4, seriesScanOperator5), + timeOrdering, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + Arrays.asList( + new SingleColumnMerger( + new InputLocation(0, 0), + timeOrdering == Ordering.ASC + ? new AscTimeComparator() + : new DescTimeComparator()), + new SingleColumnMerger( + new InputLocation(1, 0), + timeOrdering == Ordering.ASC + ? new AscTimeComparator() + : new DescTimeComparator())), + timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator()); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + SingleDeviceViewOperator singleDeviceViewOperator3 = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(9), + DEVICE2, + timeJoinOperator2, + Arrays.asList(4, 5), + tsDataTypes); + + TopKOperator topKOperator = + new TopKOperator( + driverContext.getOperatorContexts().get(10), + Arrays.asList( + singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3), + tsDataTypes, + MergeSortComparator.getComparator( + Arrays.asList( + new SortItem(OrderByKey.TIME, timeOrdering), + new SortItem(OrderByKey.DEVICE, deviceOrdering)), + Arrays.asList(-1, 0), + Arrays.asList(TSDataType.INT64, TSDataType.TEXT)), + limitValue, + true); + OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + return topKOperator; + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + return null; + } + } + + @Test + public void testEmptyTopKOperator() throws Exception { + TopKOperator topKOperator = emptyTopKOperatorTest(Ordering.ASC, Ordering.ASC, limitValue); + while (topKOperator.isBlocked().isDone() && topKOperator.hasNext()) { + TsBlock tsBlock = topKOperator.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + fail(); + } + } }