Skip to content

WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output channel in runIncrementally()#17373

Merged
LakshSingla merged 3 commits intoapache:masterfrom
Akshat-Jain:msq-wf-channel-has-no-capacity
Oct 23, 2024
Merged

WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output channel in runIncrementally()#17373
LakshSingla merged 3 commits intoapache:masterfrom
Akshat-Jain:msq-wf-channel-has-no-capacity

Conversation

@Akshat-Jain
Copy link
Contributor

Description

With the changes made in #17038, we missed a flow where multiple frames could be written to the output channel in a single iteration of WindowOperatorQueryFrameProcessor#runIncrementally. This violates the contract of runIncrementally() and leads to the following error: Channel has no capacity

Sample stacktrace
2024-10-18T00:06:27,410 WARN [MultiStageQuery-test-controller-client] org.apache.druid.msq.exec.WorkerImpl - Work failed; stage 2; task query-dummy-worker0_0; host 123:8080: UnknownError: java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel has no capacity
java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel has no capacity
	at org.apache.druid.java.util.common.Either.valueOrThrow(Either.java:95)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:271)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.run(FrameProcessorExecutor.java:141)
	at org.apache.druid.msq.exec.WorkerImpl$2$2.run(WorkerImpl.java:900)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.druid.java.util.common.ISE: Channel has no capacity
	at org.apache.druid.frame.channel.BlockingQueueFrameChannel$Writable.write(BlockingQueueFrameChannel.java:139)
	at org.apache.druid.msq.indexing.CountingWritableFrameChannel.write(CountingWritableFrameChannel.java:50)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushFrameWriter(WindowOperatorQueryFrameProcessor.java:302)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.writeRacToFrame(WindowOperatorQueryFrameProcessor.java:262)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushAllRowsAndCols(WindowOperatorQueryFrameProcessor.java:232)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.runIncrementally(WindowOperatorQueryFrameProcessor.java:150)
	at org.apache.druid.msq.counters.CpuTimeAccumulatingFrameProcessor.runIncrementally(CpuTimeAccumulatingFrameProcessor.java:66)
	at org.apache.druid.frame.processor.FrameProcessors$1FrameProcessorWithBaggage.runIncrementally(FrameProcessors.java:72)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:239)
	... 5 more

We missed this flow:

  1. We read the input channel
  2. We call runAllOpsOnBatch()
  3. If after step (1), the input channel was finished, then the operatorChain's receiver's completed() method will get called, which would call flushAllRowsAndCols(), hence writing to the output channel.
  4. After this, we end up calling flushAllRowsAndCols() again, missing the fact that there's a chance that the input channel might have finished. This ends up attempting to write another frame to the output channel, causing the Channel has no capacity error.

This PR fixes the above problematic flow by checking if the input channel is finished. If it's finished, we re-run runIncrementally() instead of step 4.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 17, 2024
@LakshSingla LakshSingla merged commit 1e96c85 into apache:master Oct 23, 2024
@adarshsanjeev adarshsanjeev added this to the 32.0.0 milestone Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants