From eddfb3afc46ca9b0747ed57aca8dd5b50e4d4519 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 13 Jan 2016 10:09:10 -0800 Subject: [PATCH 1/3] KAFKA-2886: handle sink task rebalance failures by stopping worker task --- .../apache/kafka/connect/sink/SinkTask.java | 7 ++- .../kafka/connect/runtime/WorkerSinkTask.java | 56 +++++++++++++++++-- .../connect/runtime/WorkerSinkTaskThread.java | 54 +++++++++--------- 3 files changed, 81 insertions(+), 36 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 7e793c893ab54..85ce88acada8a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -80,7 +80,7 @@ public void initialize(SinkTaskContext context) { * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. * This method will be called after partition re-assignment completes and before the SinkTask starts - * fetching data. + * fetching data. Note that any errors raised from this method will cause the task to stop. * @param partitions The list of partitions that are now assigned to the task (may include * partitions previously assigned to the task) */ @@ -88,9 +88,10 @@ public void onPartitionsAssigned(Collection partitions) { } /** - * The SinkTask use this method to close writers and commit offsets for partitions that are + * The SinkTask use this method to close writers and commit offsets for partitions that are no * longer assigned to the SinkTask. This method will be called before a rebalance operation starts - * and after the SinkTask stops fetching data. + * and after the SinkTask stops fetching data. Note that any errors raised from this method will cause + * the task to stop. * @param partitions The list of partitions that were assigned to the consumer on the last * rebalance */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index a67d0afc2aded..bb51813b42f2a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -71,6 +71,7 @@ class WorkerSinkTask implements WorkerTask { private Map lastCommittedOffsets; private Map currentOffsets; private boolean pausedForRedelivery; + private RuntimeException rebalanceException; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, Time time) { @@ -84,6 +85,7 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.pausedForRedelivery = false; + this.rebalanceException = null; } @Override @@ -145,7 +147,7 @@ public boolean joinConsumerGroupAndStart() { // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions // to work with. Any rewinding will be handled immediately when polling starts. try { - consumer.poll(0); + pollConsumer(0); } catch (WakeupException e) { log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); return false; @@ -168,7 +170,7 @@ public void poll(long timeoutMs) { } log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); - ConsumerRecords msgs = consumer.poll(timeoutMs); + ConsumerRecords msgs = pollConsumer(timeoutMs); assert messageBatch.isEmpty() || msgs.isEmpty(); log.trace("{} polling returned {} messages", id, msgs.count()); @@ -237,6 +239,19 @@ public String toString() { '}'; } + private ConsumerRecords pollConsumer(long timeoutMs) { + ConsumerRecords msgs = consumer.poll(timeoutMs); + + // Exceptions raised from the task during a rebalance should be rethrown to stop the worker + if (rebalanceException != null) { + RuntimeException e = rebalanceException; + rebalanceException = null; + throw e; + } + + return msgs; + } + private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task @@ -329,9 +344,24 @@ private void rewind() { context.clearOffsets(); } + protected void abort(Throwable t) { + // Shutdown the task now and close resources. This should only be called from the worker's task thread + // TODO: Report the exception to the Worker so that it can be propagated or the task can be restarted + + log.error("{} stopping due to unexpected exception", id, t); + try { + task.stop(); + } finally { + close(); + } + } + private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { + if (rebalanceException != null) + return; + lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { @@ -365,16 +395,30 @@ public void onPartitionsAssigned(Collection partitions) { // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (started) - task.onPartitionsAssigned(partitions); + if (started) { + try { + task.onPartitionsAssigned(partitions); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; + } + } } @Override public void onPartitionsRevoked(Collection partitions) { if (started) { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); + try { + task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; + } } + // Make sure we don't have any leftover data since offsets will be reset to committed positions messageBatch.clear(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java index b65efa86da951..e868940950725 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java @@ -50,16 +50,20 @@ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, @Override public void execute() { - // Try to join and start. If we're interrupted before this completes, bail. - if (!task.joinConsumerGroupAndStart()) - return; + try { + // Try to join and start. If we're interrupted before this completes, bail. + if (!task.joinConsumerGroupAndStart()) + return; - while (getRunning()) { - iteration(); - } + while (getRunning()) { + iteration(); + } - // Make sure any uncommitted data has committed - task.commitOffsets(true, -1); + // Make sure any uncommitted data has committed + task.commitOffsets(true, -1); + } catch (Throwable t) { + task.abort(t); + } } public void iteration() { @@ -67,11 +71,9 @@ public void iteration() { // Maybe commit if (!committing && now >= nextCommit) { - synchronized (this) { - committing = true; - commitSeqno += 1; - commitStarted = now; - } + committing = true; + commitSeqno += 1; + commitStarted = now; task.commitOffsets(false, commitSeqno); nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); } @@ -91,22 +93,20 @@ public void iteration() { } public void onCommitCompleted(Throwable error, long seqno) { - synchronized (this) { - if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", task, error); + commitFailures++; } else { - if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", task, error); - commitFailures++; - } else { - log.debug("Finished {} offset commit successfully in {} ms", - task, task.time().milliseconds() - commitStarted); - commitFailures = 0; - } - committing = false; + log.debug("Finished {} offset commit successfully in {} ms", + task, task.time().milliseconds() - commitStarted); + commitFailures = 0; } + committing = false; } } From d6381d82bb9aa31fad6b8811465f7c64949f1d5d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 13 Jan 2016 11:45:10 -0800 Subject: [PATCH 2/3] Add test cases to verify exceptions in rebalance are rethrown --- .../connect/runtime/WorkerSinkTaskTest.java | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index d5eaace85b938..305a61ee38078 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; @@ -55,6 +56,9 @@ import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + @RunWith(PowerMockRunner.class) @PrepareForTest(WorkerSinkTask.class) @PowerMockIgnore("javax.management.*") @@ -156,6 +160,48 @@ public void testPollRedelivery() throws Exception { PowerMock.verifyAll(); } + @Test + public void testErrorInRebalancePartitionRevocation() throws Exception { + RuntimeException exception = new RuntimeException("Revocation error"); + + expectInitializeTask(); + expectRebalanceRevocationError(exception); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + try { + workerTask.poll(Long.MAX_VALUE); + fail("Poll should have raised the rebalance exception"); + } catch (RuntimeException e) { + assertEquals(exception, e); + } + + PowerMock.verifyAll(); + } + + @Test + public void testErrorInRebalancePartitionAssignment() throws Exception { + RuntimeException exception = new RuntimeException("Assignment error"); + + expectInitializeTask(); + expectRebalanceAssignmentError(exception); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + try { + workerTask.poll(Long.MAX_VALUE); + fail("Poll should have raised the rebalance exception"); + } catch (RuntimeException e) { + assertEquals(exception, e); + } + + PowerMock.verifyAll(); + } + private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); @@ -183,6 +229,54 @@ public ConsumerRecords answer() throws Throwable { PowerMock.expectLastCall(); } + private void expectRebalanceRevocationError(RuntimeException e) { + final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.onPartitionsRevoked(partitions); + EasyMock.expectLastCall().andThrow(e); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer>() { + @Override + public ConsumerRecords answer() throws Throwable { + rebalanceListener.getValue().onPartitionsRevoked(partitions); + return ConsumerRecords.empty(); + } + }); + } + + private void expectRebalanceAssignmentError(RuntimeException e) { + final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.onPartitionsRevoked(partitions); + EasyMock.expectLastCall(); + + sinkTask.flush(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + + consumer.commitSync(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + + workerThread.onCommitCompleted(EasyMock.isNull(), EasyMock.anyLong()); + EasyMock.expectLastCall(); + + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + + sinkTask.onPartitionsAssigned(partitions); + EasyMock.expectLastCall().andThrow(e); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer>() { + @Override + public ConsumerRecords answer() throws Throwable { + rebalanceListener.getValue().onPartitionsRevoked(partitions); + rebalanceListener.getValue().onPartitionsAssigned(partitions); + return ConsumerRecords.empty(); + } + }); + } + private void expectConsumerPoll(final int numMessages) { EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer>() { From 51df519ea6718b1a43bce163e3673519f17b14f3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 14 Jan 2016 17:55:00 -0800 Subject: [PATCH 3/3] remove call to abort in WorkerSinkTask --- .../kafka/connect/runtime/WorkerSinkTask.java | 12 ----------- .../connect/runtime/WorkerSinkTaskThread.java | 20 ++++++++----------- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index bb51813b42f2a..f48a734d06f60 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -344,18 +344,6 @@ private void rewind() { context.clearOffsets(); } - protected void abort(Throwable t) { - // Shutdown the task now and close resources. This should only be called from the worker's task thread - // TODO: Report the exception to the Worker so that it can be propagated or the task can be restarted - - log.error("{} stopping due to unexpected exception", id, t); - try { - task.stop(); - } finally { - close(); - } - } - private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java index e868940950725..93e210a38661b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java @@ -50,20 +50,16 @@ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, @Override public void execute() { - try { - // Try to join and start. If we're interrupted before this completes, bail. - if (!task.joinConsumerGroupAndStart()) - return; + // Try to join and start. If we're interrupted before this completes, bail. + if (!task.joinConsumerGroupAndStart()) + return; - while (getRunning()) { - iteration(); - } - - // Make sure any uncommitted data has committed - task.commitOffsets(true, -1); - } catch (Throwable t) { - task.abort(t); + while (getRunning()) { + iteration(); } + + // Make sure any uncommitted data has committed + task.commitOffsets(true, -1); } public void iteration() {