diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 4c0ca3284d3a..1374cc4abf71 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1752,8 +1752,8 @@ public void testMultipleParseExceptionsFailure() throws Exception final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); - Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); - Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists()); + Assert.assertEquals(0, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(0, observedSegmentGenerationMetrics.numPersists()); Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 0f986dfc9c4b..c4fa66e1b9e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -445,7 +445,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ) ); - Throwable caughtExceptionOuter = null; + Throwable caughtException = null; //milliseconds waited for created segments to be handed off long handoffWaitMs = 0L; @@ -607,8 +607,6 @@ public void run() // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; - Throwable caughtExceptionInner = null; - try { while (stillReading) { if (possiblyPause()) { @@ -809,9 +807,8 @@ public void onFailure(Throwable t) } } } - catch (Exception e) { + catch (Throwable e) { // (1) catch all exceptions while reading from kafka - caughtExceptionInner = e; if (Throwables.getRootCause(e) instanceof InterruptedException) { // Suppress InterruptedException stack trace to avoid flooding the logs log.error("Encounted InterrupedException in run() before persisting"); @@ -821,20 +818,11 @@ public void onFailure(Throwable t) throw e; } finally { - try { - // To handle cases where tasks stop reading due to stop request or exceptions - segmentGenerationMetrics.markProcessingDone(); - driver.persist(committerSupplier.get()); // persist pending data - } - catch (Exception e) { - if (caughtExceptionInner != null) { - caughtExceptionInner.addSuppressed(e); - } else { - throw e; - } - } + segmentGenerationMetrics.markProcessingDone(); } + driver.persist(committerSupplier.get()); // persist pending data + synchronized (statusLock) { if (stopRequested.get() && !publishOnStop.get()) { throw new InterruptedException("Stopping without publishing"); @@ -861,7 +849,7 @@ public void onFailure(Throwable t) // Committer is built.) sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); publishingSequences.add(sequenceMetadata.getSequenceName()); - // persist already done in finally, so directly add to publishQueue + // persist already done above, so directly add to publishQueue publishAndRegisterHandoff(sequenceMetadata); } } @@ -913,7 +901,7 @@ public void onFailure(Throwable t) catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. - caughtExceptionOuter = e; + caughtException = e; try { Futures.allAsList(publishWaitList).cancel(true); Futures.allAsList(handOffWaitList).cancel(true); @@ -921,7 +909,7 @@ public void onFailure(Throwable t) appenderator.closeNow(); } } - catch (Exception e2) { + catch (Throwable e2) { e.addSuppressed(e2); } @@ -937,9 +925,9 @@ public void onFailure(Throwable t) throw e; } } - catch (Exception e) { + catch (Throwable e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. - caughtExceptionOuter = e; + caughtException = e; try { Futures.allAsList(publishWaitList).cancel(true); Futures.allAsList(handOffWaitList).cancel(true); @@ -947,7 +935,7 @@ public void onFailure(Throwable t) appenderator.closeNow(); } } - catch (Exception e2) { + catch (Throwable e2) { e.addSuppressed(e2); } throw e; @@ -966,8 +954,8 @@ public void onFailure(Throwable t) rejectionPeriodUpdaterExec.shutdown(); } catch (Throwable e) { - if (caughtExceptionOuter != null) { - caughtExceptionOuter.addSuppressed(e); + if (caughtException != null) { + caughtException.addSuppressed(e); } else { throw e; }