diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java index 80abed705af9..77e8965a213a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.common; import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.query.exception.EarlyTerminationException; public interface Operator { @@ -28,7 +29,10 @@ public interface Operator { *

For filter operator and operators above projection phase (aggregation, selection, combine etc.), method should * only be called once, and will return a non-null block. *

For operators in projection phase (docIdSet, projection, transformExpression), method can be called multiple - * times, and will return non-empty block or null if no more documents available

+ * times, and will return non-empty block or null if no more documents available + * + * @throws EarlyTerminationException if the operator is early-terminated (interrupted) before processing the next + * block of data. Operator can early terminated when the query times out, or is already satisfied. */ T nextBlock(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index 0ad8d1eaaf09..d0a4d09ac41b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -20,6 +20,7 @@ import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.util.trace.TraceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,7 @@ public abstract class BaseOperator implements Operator { @Override public final T nextBlock() { if (Thread.interrupted()) { - throw new RuntimeException("Thread has been interrupted"); + throw new EarlyTerminationException(); } if (TraceContext.traceEnabled()) { long start = System.currentTimeMillis(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java index c15342ea9ed1..0508142eb2a8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java @@ -43,6 +43,7 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.util.trace.TraceRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +177,8 @@ public void runJob() { }); } } + } catch (EarlyTerminationException e) { + // Early-terminated because query times out or is already satisfied } catch (Exception e) { LOGGER.error("Exception processing CombineGroupBy for index {}, operator {}", index, _operators.get(index).getClass().getName(), e); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java index a06090d2e75a..1c7ff67c935a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java @@ -43,6 +43,7 @@ import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.spi.utils.BytesUtils; @@ -181,6 +182,8 @@ public void runJob() { _indexedTable.upsert(key, record); } } + } catch (EarlyTerminationException e) { + // Early-terminated because query times out or is already satisfied } catch (Exception e) { LOGGER.error("Exception processing CombineGroupByOrderBy for index {}, operator {}", index, _operators.get(index).getClass().getName(), e); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java index 80509161d37f..b9dcdb634c70 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.request.Selection; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.query.reduce.CombineService; import org.apache.pinot.core.util.trace.TraceCallable; import org.apache.pinot.core.util.trace.TraceRunnable; @@ -117,6 +118,8 @@ public void runJob() { } } blockingQueue.offer(mergedBlock); + } catch (EarlyTerminationException e) { + // Early-terminated because query times out or is already satisfied } catch (Exception e) { LOGGER.error("Caught exception while executing query.", e); blockingQueue.offer(new IntermediateResultsBlock(e)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/exception/EarlyTerminationException.java b/pinot-core/src/main/java/org/apache/pinot/core/query/exception/EarlyTerminationException.java new file mode 100644 index 000000000000..fe88a062ac0c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/exception/EarlyTerminationException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.exception; + +import org.apache.pinot.core.common.Operator; + + +/** + * The {@code EarlyTerminationException} can be thrown from {@link Operator#nextBlock()} when the operator is early + * terminated (interrupted). + */ +public class EarlyTerminationException extends RuntimeException { + public EarlyTerminationException() { + super(); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java index 9c4aceff575e..eebd32d10223 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.pql.parsers.Pql2Compiler; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -83,7 +84,11 @@ public void testCombineGroupByOrderByOperator() { testCombineOperator(operators, combineGroupByOrderByOperator); } - public void testCombineOperator(List operators, BaseOperator combineOperator) { + /** + * NOTE: It is hard to test the logger behavior, but only one error message about the query timeout should be logged + * for each query. + */ + private void testCombineOperator(List operators, BaseOperator combineOperator) { IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) combineOperator.nextBlock(); List processingExceptions = intermediateResultsBlock.getProcessingExceptions(); assertNotNull(processingExceptions); @@ -122,8 +127,8 @@ protected Block getNextBlock() { try { Thread.sleep(3_600_000L); } catch (InterruptedException e) { - // Thread should be interrupted - throw new RuntimeException(e); + // Thread should be interrupted for early-termination + throw new EarlyTerminationException(); } finally { // Wait for 100 milliseconds before marking the operation done try {