From 2c87ecd719b307ad385c4a73f848f14157b88e7b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 26 Mar 2026 23:01:05 -0700 Subject: [PATCH 1/2] fix: Improved error handling in SeekableStreamIndexTaskRunner. The main improvement is that "persist" is moved out of a finally block, and now only happens on the normal path. This has two benefits. First, there is no point in persisting on the error path, and the in-memory index might be in a bad state anyway at that point. Second, moving the persist call out of "finally" fixes an issue where an exception thrown from "persist" would cause an exception thrown from "add" to be lost. This can come up in production when the in-memory index grows too large, causing the main code to throw an OutOfMemoryError, and then something goes wrong with the persist too. In this situation the original OutOfMemoryError would not have been logged. A secondary improvement is that we catch Throwable rather than Exception to trigger cleanup and when handling errors that occur during cleanup. This ensures we don't miss cleanup tasks when an Error is thrown by the main code, and that we don't lose the original exception if an Error is thrown by the cleanup code. --- .../SeekableStreamIndexTaskRunner.java | 38 +++++++------------ 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 0f986dfc9c4b..c4fa66e1b9e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -445,7 +445,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ) ); - Throwable caughtExceptionOuter = null; + Throwable caughtException = null; //milliseconds waited for created segments to be handed off long handoffWaitMs = 0L; @@ -607,8 +607,6 @@ public void run() // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; - Throwable caughtExceptionInner = null; - try { while (stillReading) { if (possiblyPause()) { @@ -809,9 +807,8 @@ public void onFailure(Throwable t) } } } - catch (Exception e) { + catch (Throwable e) { // (1) catch all exceptions while reading from kafka - caughtExceptionInner = e; if (Throwables.getRootCause(e) instanceof InterruptedException) { // Suppress InterruptedException stack trace to avoid flooding the logs log.error("Encounted InterrupedException in run() before persisting"); @@ -821,20 +818,11 @@ public void onFailure(Throwable t) throw e; } finally { - try { - // To handle cases where tasks stop reading due to stop request or exceptions - segmentGenerationMetrics.markProcessingDone(); - driver.persist(committerSupplier.get()); // persist pending data - } - catch (Exception e) { - if (caughtExceptionInner != null) { - caughtExceptionInner.addSuppressed(e); - } else { - throw e; - } - } + segmentGenerationMetrics.markProcessingDone(); } + driver.persist(committerSupplier.get()); // persist pending data + synchronized (statusLock) { if (stopRequested.get() && !publishOnStop.get()) { throw new InterruptedException("Stopping without publishing"); @@ -861,7 +849,7 @@ public void onFailure(Throwable t) // Committer is built.) sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); publishingSequences.add(sequenceMetadata.getSequenceName()); - // persist already done in finally, so directly add to publishQueue + // persist already done above, so directly add to publishQueue publishAndRegisterHandoff(sequenceMetadata); } } @@ -913,7 +901,7 @@ public void onFailure(Throwable t) catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. - caughtExceptionOuter = e; + caughtException = e; try { Futures.allAsList(publishWaitList).cancel(true); Futures.allAsList(handOffWaitList).cancel(true); @@ -921,7 +909,7 @@ public void onFailure(Throwable t) appenderator.closeNow(); } } - catch (Exception e2) { + catch (Throwable e2) { e.addSuppressed(e2); } @@ -937,9 +925,9 @@ public void onFailure(Throwable t) throw e; } } - catch (Exception e) { + catch (Throwable e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. - caughtExceptionOuter = e; + caughtException = e; try { Futures.allAsList(publishWaitList).cancel(true); Futures.allAsList(handOffWaitList).cancel(true); @@ -947,7 +935,7 @@ public void onFailure(Throwable t) appenderator.closeNow(); } } - catch (Exception e2) { + catch (Throwable e2) { e.addSuppressed(e2); } throw e; @@ -966,8 +954,8 @@ public void onFailure(Throwable t) rejectionPeriodUpdaterExec.shutdown(); } catch (Throwable e) { - if (caughtExceptionOuter != null) { - caughtExceptionOuter.addSuppressed(e); + if (caughtException != null) { + caughtException.addSuppressed(e); } else { throw e; } From 209cbf8b6d5baa6913b2c389ae46af2a57cd648e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 27 Mar 2026 03:52:45 -0700 Subject: [PATCH 2/2] No persists on errors. --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 4c0ca3284d3a..1374cc4abf71 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1752,8 +1752,8 @@ public void testMultipleParseExceptionsFailure() throws Exception final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); - Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); - Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists()); + Assert.assertEquals(0, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(0, observedSegmentGenerationMetrics.numPersists()); Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount()); }