From 6d262240a8a9d01e99388e8b2f3cf2d45ff7d57d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Jan 2016 16:45:44 -0800 Subject: [PATCH 1/9] Combine WorkerSinkTask and WorkerSinkTaskThread and refactor as a Runnable --- .../connect/runtime/AbstractWorkerTask.java | 92 ++++++++++ .../apache/kafka/connect/runtime/Worker.java | 24 ++- .../kafka/connect/runtime/WorkerSinkTask.java | 165 +++++++++++------- .../connect/runtime/WorkerSinkTaskThread.java | 112 ------------ .../connect/runtime/WorkerSourceTask.java | 152 +++++++--------- .../kafka/connect/runtime/WorkerTask.java | 12 +- .../connect/runtime/WorkerSinkTaskTest.java | 18 +- .../runtime/WorkerSinkTaskThreadedTest.java | 101 +++++------ .../connect/runtime/WorkerSourceTaskTest.java | 15 +- .../kafka/connect/runtime/WorkerTest.java | 6 +- 10 files changed, 332 insertions(+), 365 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java delete mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java new file mode 100644 index 0000000000000..ef9b66ebf1966 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +abstract class AbstractWorkerTask implements WorkerTask { + private static final Logger log = LoggerFactory.getLogger(AbstractWorkerTask.class); + + protected final ConnectorTaskId id; + private final AtomicBoolean stopping; + private final AtomicBoolean running; + private final CountDownLatch shutdownLatch; + + public AbstractWorkerTask(ConnectorTaskId id) { + this.id = id; + this.stopping = new AtomicBoolean(false); + this.running = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + + protected abstract void execute(); + + protected abstract void close(); + + protected boolean isStopping() { + return stopping.get(); + } + + private void doClose() { + try { + close(); + } catch (Throwable t) { + log.error("Unhandled exception in task shutdown {}", id, t); + } finally { + running.set(false); + shutdownLatch.countDown(); + } + } + + @Override + public void run() { + if (!this.running.compareAndSet(false, true)) + return; + + try { + execute(); + } catch (Throwable t) { + log.error("Unhandled exception in task {}", id, t); + } finally { + doClose(); + } + } + + @Override + public void stop() { + this.stopping.set(true); + } + + @Override + public boolean awaitStop(long timeoutMs) { + if (!running.get()) + return true; + + try { + return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 88b4c10ea8f7a..c7ff8554e639c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -17,11 +17,11 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; @@ -33,8 +33,8 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; -import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; +import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.reflections.Reflections; import org.reflections.util.ClasspathHelper; @@ -48,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** @@ -62,8 +64,10 @@ public class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); - private Time time; - private WorkerConfig config; + private final ExecutorService executor; + private final Time time; + private final WorkerConfig config; + private Converter keyConverter; private Converter valueConverter; private Converter internalKeyConverter; @@ -80,6 +84,7 @@ public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) { @SuppressWarnings("unchecked") public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) { + this.executor = Executors.newCachedThreadPool(); this.time = time; this.config = config; this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); @@ -154,7 +159,6 @@ public void stop() { log.debug("Waiting for task {} to finish shutting down", task); if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) log.error("Graceful shutdown of task {} failed.", task); - task.close(); } long timeoutMs = limit - time.milliseconds(); @@ -324,7 +328,7 @@ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); // Decide which type of worker task we need based on the type of task. - final WorkerTask workerTask; + final AbstractWorkerTask workerTask; if (task instanceof SourceTask) { SourceTask sourceTask = (SourceTask) task; OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), @@ -342,7 +346,9 @@ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { // Start the task before adding modifying any state, any exceptions are caught higher up the // call chain and there's no cleanup to do here - workerTask.start(taskConfig.originalsStrings()); + workerTask.initialize(taskConfig.originalsStrings()); + executor.submit(workerTask); + if (task instanceof SourceTask) { WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; sourceTaskOffsetCommitter.schedule(id, workerSourceTask); @@ -367,7 +373,6 @@ public void stopTask(ConnectorTaskId id) { task.stop(); if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) log.error("Graceful stop of task {} failed.", task); - task.close(); tasks.remove(id); } @@ -394,4 +399,5 @@ public Converter getInternalKeyConverter() { public Converter getInternalValueConverter() { return internalValueConverter; } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index f48a734d06f60..c19c6dffaa845 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -48,36 +48,43 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; /** * WorkerTask that uses a SinkTask to export data from Kafka. */ -class WorkerSinkTask implements WorkerTask { +class WorkerSinkTask extends AbstractWorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - private final ConnectorTaskId id; - private final SinkTask task; private final WorkerConfig workerConfig; + private final SinkTask task; + private Map taskConfig; private final Time time; private final Converter keyConverter; private final Converter valueConverter; - private WorkerSinkTaskThread workThread; - private Map taskProps; private KafkaConsumer consumer; private WorkerSinkTaskContext context; - private boolean started; private final List messageBatch; private Map lastCommittedOffsets; private Map currentOffsets; - private boolean pausedForRedelivery; private RuntimeException rebalanceException; + private long nextCommit; + private int commitSeqno; + private long commitStarted; + private int commitFailures; + private boolean pausedForRedelivery; + private boolean committing; + private boolean started; + + public WorkerSinkTask(ConnectorTaskId id, + SinkTask task, + WorkerConfig workerConfig, + Converter keyConverter, + Converter valueConverter, + Time time) { + super(id); - public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, - Converter keyConverter, Converter valueConverter, Time time) { - this.id = id; - this.task = task; this.workerConfig = workerConfig; + this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; @@ -86,58 +93,106 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf this.currentOffsets = new HashMap<>(); this.pausedForRedelivery = false; this.rebalanceException = null; + this.nextCommit = time.milliseconds() + + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + this.committing = false; + this.commitSeqno = 0; + this.commitStarted = -1; + this.commitFailures = 0; } @Override - public void start(Map props) { - taskProps = props; - consumer = createConsumer(); - context = new WorkerSinkTaskContext(consumer); - - workThread = createWorkerThread(); - workThread.start(); + public void initialize(Map taskConfig) { + this.taskConfig = taskConfig; + this.consumer = createConsumer(); + this.context = new WorkerSinkTaskContext(consumer); } @Override public void stop() { // Offset commit is handled upon exit in work thread - if (workThread != null) - workThread.startGracefulShutdown(); + super.stop(); consumer.wakeup(); } @Override - public boolean awaitStop(long timeoutMs) { - boolean success = true; - if (workThread != null) { - try { - success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); - if (!success) - workThread.forceShutdown(); - } catch (InterruptedException e) { - success = false; - } - } - task.stop(); - return success; - } - - @Override - public void close() { + protected void close() { // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // passed in + task.stop(); if (consumer != null) consumer.close(); } + @Override + public void execute() { + // Try to join and start. If we're interrupted before this completes, bail. + if (!joinConsumerGroupAndStart()) + return; + + while (!isStopping()) + iteration(); + + // Make sure any uncommitted data has committed + commitOffsets(true, -1); + } + + protected void iteration() { + long now = time.milliseconds(); + + // Maybe commit + if (!committing && now >= nextCommit) { + committing = true; + commitSeqno += 1; + commitStarted = now; + commitOffsets(false, commitSeqno); + nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + } + + // Check for timed out commits + long commitTimeout = commitStarted + workerConfig.getLong( + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + if (committing && now >= commitTimeout) { + log.warn("Commit of {} offsets timed out", this); + commitFailures++; + committing = false; + } + + // And process messages + long timeoutMs = Math.max(nextCommit - now, 0); + poll(timeoutMs); + } + + private void onCommitCompleted(Throwable error, long seqno) { + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + commitFailures++; + } else { + log.debug("Finished {} offset commit successfully in {} ms", + this, time.milliseconds() - commitStarted); + commitFailures = 0; + } + committing = false; + } + } + + public int commitFailures() { + return commitFailures; + } + /** * Performs initial join process for consumer group, ensures we have an assignment, and initializes + starts the * SinkTask. * * @returns true if successful, false if joining the consumer group was interrupted */ - public boolean joinConsumerGroupAndStart() { - String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG); + protected boolean joinConsumerGroupAndStart() { + String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new ConnectException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); @@ -153,14 +208,14 @@ public boolean joinConsumerGroupAndStart() { return false; } task.initialize(context); - task.start(taskProps); + task.start(taskConfig); log.info("Sink task {} finished initialization and start", this); started = true; return true; } /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ - public void poll(long timeoutMs) { + protected void poll(long timeoutMs) { try { rewind(); long retryTimeout = context.timeout(); @@ -185,7 +240,7 @@ public void poll(long timeoutMs) { * Starts an offset commit by flushing outstanding messages from the task and then starting * the write commit. This should only be invoked by the WorkerSinkTaskThread. **/ - public void commitOffsets(boolean sync, final int seqno) { + private void commitOffsets(boolean sync, final int seqno) { log.info("{} Committing offsets", this); final Map offsets = new HashMap<>(currentOffsets); @@ -200,7 +255,7 @@ public void commitOffsets(boolean sync, final int seqno) { consumer.seek(entry.getKey(), entry.getValue().offset()); } currentOffsets = new HashMap<>(lastCommittedOffsets); - workThread.onCommitCompleted(t, seqno); + onCommitCompleted(t, seqno); return; } @@ -208,30 +263,22 @@ public void commitOffsets(boolean sync, final int seqno) { try { consumer.commitSync(offsets); lastCommittedOffsets = offsets; - workThread.onCommitCompleted(null, seqno); + onCommitCompleted(null, seqno); } catch (KafkaException e) { - workThread.onCommitCompleted(e, seqno); + onCommitCompleted(e, seqno); } } else { OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception error) { lastCommittedOffsets = offsets; - workThread.onCommitCompleted(error, seqno); + onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } - public Time time() { - return time; - } - - public WorkerConfig workerConfig() { - return workerConfig; - } - @Override public String toString() { return "WorkerSinkTask{" + @@ -277,10 +324,6 @@ private KafkaConsumer createConsumer() { return newConsumer; } - private WorkerSinkTaskThread createWorkerThread() { - return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); - } - private void convertMessages(ConsumerRecords msgs) { for (ConsumerRecord msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); @@ -321,8 +364,8 @@ private void deliverMessages() { consumer.pause(tp); // Let this exit normally, the batch will be reprocessed on the next loop. } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id); - log.error("Task is being killed and will not recover until manually restarted:", t); + log.error("Task {} threw an uncaught and unrecoverable exception", id, t); + log.error("Task is being killed and will not recover until manually restarted:"); throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception."); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java deleted file mode 100644 index 93e210a38661b..0000000000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.connect.runtime; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.util.ShutdownableThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to - * simplify testing. - */ -class WorkerSinkTaskThread extends ShutdownableThread { - private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - - private final WorkerSinkTask task; - private long nextCommit; - private boolean committing; - private int commitSeqno; - private long commitStarted; - private int commitFailures; - - public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, - WorkerConfig workerConfig) { - super(name); - this.task = task; - this.nextCommit = time.milliseconds() + - workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - this.committing = false; - this.commitSeqno = 0; - this.commitStarted = -1; - this.commitFailures = 0; - } - - @Override - public void execute() { - // Try to join and start. If we're interrupted before this completes, bail. - if (!task.joinConsumerGroupAndStart()) - return; - - while (getRunning()) { - iteration(); - } - - // Make sure any uncommitted data has committed - task.commitOffsets(true, -1); - } - - public void iteration() { - long now = task.time().milliseconds(); - - // Maybe commit - if (!committing && now >= nextCommit) { - committing = true; - commitSeqno += 1; - commitStarted = now; - task.commitOffsets(false, commitSeqno); - nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - } - - // Check for timed out commits - long commitTimeout = commitStarted + task.workerConfig().getLong( - WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); - if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", task); - commitFailures++; - committing = false; - } - - // And process messages - long timeoutMs = Math.max(nextCommit - now, 0); - task.poll(timeoutMs); - } - - public void onCommitCompleted(Throwable error, long seqno) { - if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); - } else { - if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", task, error); - commitFailures++; - } else { - log.debug("Finished {} offset commit successfully in {} ms", - task, task.time().milliseconds() - commitStarted); - commitFailures = 0; - } - committing = false; - } - } - - public int commitFailures() { - return commitFailures; - } -} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6c61d79bad66d..78419a805ea09 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -31,7 +31,6 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.ShutdownableThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,20 +46,18 @@ /** * WorkerTask that uses a SourceTask to ingest data into Kafka. */ -class WorkerSourceTask implements WorkerTask { +class WorkerSourceTask extends AbstractWorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); private static final long SEND_FAILED_BACKOFF_MS = 100; - private final ConnectorTaskId id; + private final WorkerConfig workerConfig; private final SourceTask task; private final Converter keyConverter; private final Converter valueConverter; private KafkaProducer producer; - private WorkerSourceTaskThread workThread; private final OffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; - private final WorkerConfig workerConfig; private final Time time; private List toSend; @@ -73,19 +70,28 @@ class WorkerSourceTask implements WorkerTask { private boolean flushing; private CountDownLatch stopRequestedLatch; - public WorkerSourceTask(ConnectorTaskId id, SourceTask task, - Converter keyConverter, Converter valueConverter, + private Map taskConfig; + private boolean finishedStart = false; + private boolean startedShutdownBeforeStartCompleted = false; + + public WorkerSourceTask(ConnectorTaskId id, + SourceTask task, + Converter keyConverter, + Converter valueConverter, KafkaProducer producer, - OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, - WorkerConfig workerConfig, Time time) { - this.id = id; + OffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + WorkerConfig workerConfig, + Time time) { + super(id); + + this.workerConfig = workerConfig; this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.producer = producer; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.workerConfig = workerConfig; this.time = time; this.toSend = null; @@ -97,37 +103,60 @@ public WorkerSourceTask(ConnectorTaskId id, SourceTask task, } @Override - public void start(Map props) { - workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props); - workThread.start(); + public void initialize(Map config) { + this.taskConfig = config; + } + + protected void close() { + // nothing to do } @Override public void stop() { - if (workThread != null) { - workThread.startGracefulShutdown(); - stopRequestedLatch.countDown(); + super.stop(); + stopRequestedLatch.countDown(); + synchronized (this) { + if (finishedStart) + task.stop(); + else + startedShutdownBeforeStartCompleted = true; } } @Override - public boolean awaitStop(long timeoutMs) { - boolean success = true; - if (workThread != null) { - try { - success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); - if (!success) - workThread.forceShutdown(); - } catch (InterruptedException e) { - success = false; + public void execute() { + try { + task.initialize(new WorkerSourceTaskContext(offsetReader)); + task.start(taskConfig); + log.info("Source task {} finished initialization and start", this); + synchronized (this) { + if (startedShutdownBeforeStartCompleted) { + task.stop(); + return; + } + finishedStart = true; } + + while (!isStopping()) { + if (toSend == null) + toSend = task.poll(); + if (toSend == null) + continue; + if (!sendRecords()) + stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + // Ignore and allow to exit. + } catch (Throwable t) { + log.error("Task {} threw an uncaught and unrecoverable exception", id); + log.error("Task is being killed and will not recover until manually restarted:", t); + // It should still be safe to let this fall through and commit offsets since this exception would have + // simply resulted in not getting more records but all the existing records should be ok to flush + // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit + // to fail. } - return success; - } - @Override - public void close() { - // Nothing to do + commitOffsets(); } /** @@ -323,67 +352,6 @@ private synchronized void finishSuccessfulFlush() { flushing = false; } - - private class WorkerSourceTaskThread extends ShutdownableThread { - private Map workerProps; - private boolean finishedStart; - private boolean startedShutdownBeforeStartCompleted; - - public WorkerSourceTaskThread(String name, Map workerProps) { - super(name); - this.workerProps = workerProps; - this.finishedStart = false; - this.startedShutdownBeforeStartCompleted = false; - } - - @Override - public void execute() { - try { - task.initialize(new WorkerSourceTaskContext(offsetReader)); - task.start(workerProps); - log.info("Source task {} finished initialization and start", this); - synchronized (this) { - if (startedShutdownBeforeStartCompleted) { - task.stop(); - return; - } - finishedStart = true; - } - - while (getRunning()) { - if (toSend == null) - toSend = task.poll(); - if (toSend == null) - continue; - if (!sendRecords()) - stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); - } - } catch (InterruptedException e) { - // Ignore and allow to exit. - } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id); - log.error("Task is being killed and will not recover until manually restarted:", t); - // It should still be safe to let this fall through and commit offsets since this exception would have - // simply resulted in not getting more records but all the existing records should be ok to flush - // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit - // to fail. - } - - commitOffsets(); - } - - @Override - public void startGracefulShutdown() { - super.startGracefulShutdown(); - synchronized (this) { - if (finishedStart) - task.stop(); - else - startedShutdownBeforeStartCompleted = true; - } - } - } - @Override public String toString() { return "WorkerSourceTask{" + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 66fc45bb68b6b..c5e74ebc884cd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -24,12 +24,12 @@ * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with * Kafka to create a data flow. */ -interface WorkerTask { +interface WorkerTask extends Runnable { /** - * Start the Task + * Initialize the task for execution. * @param props initial configuration */ - void start(Map props); + void initialize(Map props); /** * Stop this task from processing messages. This method does not block, it only triggers @@ -45,10 +45,4 @@ interface WorkerTask { */ boolean awaitStop(long timeoutMs); - /** - * Close this task. This is different from #{@link #stop} and #{@link #awaitStop} in that the - * stop methods ensure processing has stopped but may leave resources allocated. This method - * should clean up all resources. - */ - void close(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 305a61ee38078..765777ee620c1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -97,8 +97,6 @@ public class WorkerSinkTaskTest { @Mock private Converter valueConverter; @Mock - private WorkerSinkTaskThread workerThread; - @Mock private KafkaConsumer consumer; private Capture rebalanceListener = EasyMock.newCapture(); @@ -116,7 +114,7 @@ public void setUp() { workerProps.put("internal.value.converter.schemas.enable", "false"); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); recordsReturned = 0; @@ -152,7 +150,7 @@ public void testPollRedelivery() throws Exception { PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); workerTask.poll(Long.MAX_VALUE); workerTask.poll(Long.MAX_VALUE); @@ -169,7 +167,7 @@ public void testErrorInRebalancePartitionRevocation() throws Exception { PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); try { workerTask.poll(Long.MAX_VALUE); @@ -190,7 +188,7 @@ public void testErrorInRebalancePartitionAssignment() throws Exception { PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); try { workerTask.poll(Long.MAX_VALUE); @@ -205,11 +203,6 @@ public void testErrorInRebalancePartitionAssignment() throws Exception { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -257,9 +250,6 @@ private void expectRebalanceAssignmentError(RuntimeException e) { consumer.commitSync(EasyMock.>anyObject()); EasyMock.expectLastCall(); - workerThread.onCommitCompleted(EasyMock.isNull(), EasyMock.anyLong()); - EasyMock.expectLastCall(); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 69156310d294b..05d7dadd06ed2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -56,7 +56,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -100,7 +99,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private Converter valueConverter; private WorkerSinkTask workerTask; @Mock private KafkaConsumer consumer; - private WorkerSinkTaskThread workerThread; private Capture rebalanceListener = EasyMock.newCapture(); private long recordsReturned; @@ -119,7 +117,7 @@ public void setup() { workerProps.put("internal.value.converter.schemas.enable", "false"); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); recordsReturned = 0; @@ -130,14 +128,13 @@ public void testPollsInBackground() throws Exception { expectInitializeTask(); Capture> capturedRecords = expectPolls(1L); expectStopTask(10L); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); for (int i = 0; i < 10; i++) { - workerThread.iteration(); + workerTask.iteration(); } workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); @@ -168,18 +165,17 @@ public void testCommit() throws Exception { = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, null, 0, true); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); // First iteration gets one record - workerThread.iteration(); + workerTask.iteration(); // Second triggers commit, gets a second offset - workerThread.iteration(); + workerTask.iteration(); // Commit finishes synchronously for testing so we can check this immediately - assertEquals(0, workerThread.commitFailures()); + assertEquals(0, workerTask.commitFailures()); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -203,17 +199,16 @@ public void testCommitTaskFlushFailure() throws Exception { consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); PowerMock.expectLastCall(); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.iteration(); + workerTask.iteration(); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -237,18 +232,17 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); PowerMock.expectLastCall(); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); // Second iteration triggers first commit, third iteration triggers second (failing) commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -263,18 +257,17 @@ public void testCommitConsumerFailure() throws Exception { = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, new Exception(), 0, true); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); + workerTask.iteration(); + workerTask.iteration(); // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -290,21 +283,20 @@ public void testCommitTimeout() throws Exception { = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); expectStopTask(4); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't // trigger another commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -366,15 +358,14 @@ public Object answer() throws Throwable { PowerMock.expectLastCall(); expectStopTask(0); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -410,14 +401,12 @@ public Object answer() throws Throwable { }); expectStopTask(3); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); - PowerMock.replayAll(); - workerTask.start(TASK_PROPS); + workerTask.initialize(TASK_PROPS); workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.iteration(); + workerTask.iteration(); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -428,14 +417,6 @@ public Object answer() throws Throwable { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, - workerTask, "mock-worker-thread", time, - workerConfig); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -526,10 +507,10 @@ public ConsumerRecords answer() throws Throwable { } private Capture expectOffsetFlush(final long expectedMessages, - final RuntimeException flushError, - final Exception consumerCommitError, - final long consumerCommitDelayMs, - final boolean invokeCallback) + final RuntimeException flushError, + final Exception consumerCommitError, + final long consumerCommitDelayMs, + final boolean invokeCallback) throws Exception { final long finalOffset = FIRST_OFFSET + expectedMessages; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index f16cbebb64ebe..3888534ea0924 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -52,6 +52,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -77,6 +79,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); + private ExecutorService executor = Executors.newSingleThreadExecutor(); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private WorkerConfig config; @Mock private SourceTask sourceTask; @@ -132,7 +135,8 @@ public void testPollsInBackground() throws Exception { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); workerTask.stop(); assertEquals(true, workerTask.awaitStop(1000)); @@ -160,7 +164,8 @@ public void testCommit() throws Exception { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); assertTrue(workerTask.commitOffsets()); workerTask.stop(); @@ -189,7 +194,8 @@ public void testCommitFailure() throws Exception { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); assertFalse(workerTask.commitOffsets()); workerTask.stop(); @@ -271,7 +277,8 @@ public Object answer() throws Throwable { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it // cannot be invoked immediately in the thread trying to stop the task. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 335e0ce7e7477..f33347a431f12 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -354,14 +354,13 @@ public void testAddRemoveTask() throws Exception { .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); + workerTask.initialize(origProps); EasyMock.expectLastCall(); // Remove workerTask.stop(); EasyMock.expectLastCall(); EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); - workerTask.close(); EasyMock.expectLastCall(); offsetBackingStore.stop(); @@ -424,7 +423,7 @@ public void testCleanupTasksOnStop() throws Exception { .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); + workerTask.initialize(origProps); EasyMock.expectLastCall(); // Remove on Worker.stop() @@ -432,7 +431,6 @@ public void testCleanupTasksOnStop() throws Exception { EasyMock.expectLastCall(); EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); // Note that in this case we *do not* commit offsets since it's an unclean shutdown - workerTask.close(); EasyMock.expectLastCall(); offsetBackingStore.stop(); From 22c04c95bcf2e355376892afc7d2990f5e3cb02f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 26 Jan 2016 13:13:40 -0800 Subject: [PATCH 2/9] Ensure onPartitionsRevoked obeys close semantics --- .../kafka/connect/runtime/WorkerSinkTask.java | 87 ++++++++++++------- .../connect/runtime/WorkerSinkTaskTest.java | 4 +- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index c19c6dffaa845..e19a817d076a8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -130,11 +130,14 @@ public void execute() { if (!joinConsumerGroupAndStart()) return; - while (!isStopping()) - iteration(); - - // Make sure any uncommitted data has committed - commitOffsets(true, -1); + try { + while (!isStopping()) + iteration(); + } finally { + // Make sure any uncommitted data has been committed and the task has + // a chance to clean up its state + closePartitions(); + } } protected void iteration() { @@ -145,7 +148,7 @@ protected void iteration() { committing = true; commitSeqno += 1; commitStarted = now; - commitOffsets(false, commitSeqno); + flushAndCommitOffsets(false, commitSeqno); nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); } @@ -240,25 +243,8 @@ protected void poll(long timeoutMs) { * Starts an offset commit by flushing outstanding messages from the task and then starting * the write commit. This should only be invoked by the WorkerSinkTaskThread. **/ - private void commitOffsets(boolean sync, final int seqno) { + private void commitOffsets(Map offsets, boolean sync, final int seqno) { log.info("{} Committing offsets", this); - - final Map offsets = new HashMap<>(currentOffsets); - - try { - task.flush(offsets); - } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing:", this, t); - log.error("Rewinding offsets to last committed offsets"); - for (Map.Entry entry : lastCommittedOffsets.entrySet()) { - log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); - consumer.seek(entry.getKey(), entry.getValue().offset()); - } - currentOffsets = new HashMap<>(lastCommittedOffsets); - onCommitCompleted(t, seqno); - return; - } - if (sync) { try { consumer.commitSync(offsets); @@ -279,6 +265,25 @@ public void onComplete(Map offsets, Exception } } + private void flushAndCommitOffsets(boolean sync, int seqno) { + Map offsets = new HashMap<>(currentOffsets); + try { + task.flush(offsets); + } catch (Throwable t) { + log.error("Commit of {} offsets failed due to exception while flushing:", this, t); + log.error("Rewinding offsets to last committed offsets"); + for (Map.Entry entry : lastCommittedOffsets.entrySet()) { + log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + consumer.seek(entry.getKey(), entry.getValue().offset()); + } + currentOffsets = new HashMap<>(lastCommittedOffsets); + onCommitCompleted(t, seqno); + return; + } + + commitOffsets(offsets, sync, seqno); + } + @Override public String toString() { return "WorkerSinkTask{" + @@ -387,12 +392,32 @@ private void rewind() { context.clearOffsets(); } + private void openPartitions(Collection partitions) { + if (partitions.isEmpty()) + return; + + task.onPartitionsAssigned(partitions); + } + + private void closePartitions() { + if (currentOffsets.isEmpty()) + return; + + Map offsets = new HashMap<>(currentOffsets); + try { + task.flush(offsets); + } finally { + // Some sink implementations may not actually flush the data until close is called, + // so the commit should follow it. + task.onPartitionsRevoked(currentOffsets.keySet()); + } + + commitOffsets(offsets, true, -1); + } + private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { - if (rebalanceException != null) - return; - lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { @@ -407,6 +432,7 @@ public void onPartitionsAssigned(Collection partitions) { // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own. if (pausedForRedelivery) { pausedForRedelivery = false; + Set assigned = new HashSet<>(partitions); Set taskPaused = context.pausedPartitions(); @@ -426,9 +452,9 @@ public void onPartitionsAssigned(Collection partitions) { // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (started) { + if (started && rebalanceException == null) { try { - task.onPartitionsAssigned(partitions); + openPartitions(partitions); } catch (RuntimeException e) { // The consumer swallows exceptions raised in the rebalance listener, so we need to store // exceptions and rethrow when poll() returns. @@ -441,8 +467,7 @@ public void onPartitionsAssigned(Collection partitions) { public void onPartitionsRevoked(Collection partitions) { if (started) { try { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); + closePartitions(); } catch (RuntimeException e) { // The consumer swallows exceptions raised in the rebalance listener, so we need to store // exceptions and rethrow when poll() returns. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 765777ee620c1..a3cd5766ab075 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -225,7 +225,7 @@ public ConsumerRecords answer() throws Throwable { private void expectRebalanceRevocationError(RuntimeException e) { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(partitions); + sinkTask.onPartitionsRevoked(new HashSet<>(partitions)); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( @@ -241,7 +241,7 @@ public ConsumerRecords answer() throws Throwable { private void expectRebalanceAssignmentError(RuntimeException e) { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(partitions); + sinkTask.onPartitionsRevoked(new HashSet<>(partitions)); EasyMock.expectLastCall(); sinkTask.flush(EasyMock.>anyObject()); From ec9611cfdb05f12a62e304433c819e46feeed321 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 26 Jan 2016 14:45:27 -0800 Subject: [PATCH 3/9] Assignments are only initialized in onPartitionsAssigned --- .../kafka/connect/runtime/WorkerSinkTask.java | 44 +++------ .../connect/runtime/WorkerSinkTaskTest.java | 39 +++++--- .../runtime/WorkerSinkTaskThreadedTest.java | 96 ++++++++++++++----- 3 files changed, 112 insertions(+), 67 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index e19a817d076a8..2591bc025e52b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -73,7 +73,6 @@ class WorkerSinkTask extends AbstractWorkerTask { private int commitFailures; private boolean pausedForRedelivery; private boolean committing; - private boolean started; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, @@ -88,7 +87,6 @@ public WorkerSinkTask(ConnectorTaskId id, this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; - this.started = false; this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.pausedForRedelivery = false; @@ -126,10 +124,7 @@ protected void close() { @Override public void execute() { - // Try to join and start. If we're interrupted before this completes, bail. - if (!joinConsumerGroupAndStart()) - return; - + initializeAndStart(); try { while (!isStopping()) iteration(); @@ -189,32 +184,18 @@ public int commitFailures() { } /** - * Performs initial join process for consumer group, ensures we have an assignment, and initializes + starts the - * SinkTask. - * - * @returns true if successful, false if joining the consumer group was interrupted + * Initializes and starts the SinkTask. */ - protected boolean joinConsumerGroupAndStart() { + protected void initializeAndStart() { String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new ConnectException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); log.debug("Task {} subscribing to topics {}", id, topics); consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - - // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions - // to work with. Any rewinding will be handled immediately when polling starts. - try { - pollConsumer(0); - } catch (WakeupException e) { - log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); - return false; - } task.initialize(context); task.start(taskConfig); log.info("Sink task {} finished initialization and start", this); - started = true; - return true; } /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ @@ -344,6 +325,9 @@ private void convertMessages(ConsumerRecords msgs) { } private void deliverMessages() { + if (messageBatch.isEmpty()) + return; + // Finally, deliver this batch to the sink try { // Since we reuse the messageBatch buffer, ensure we give the task its own copy @@ -452,7 +436,7 @@ public void onPartitionsAssigned(Collection partitions) { // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (started && rebalanceException == null) { + if (rebalanceException == null) { try { openPartitions(partitions); } catch (RuntimeException e) { @@ -465,14 +449,12 @@ public void onPartitionsAssigned(Collection partitions) { @Override public void onPartitionsRevoked(Collection partitions) { - if (started) { - try { - closePartitions(); - } catch (RuntimeException e) { - // The consumer swallows exceptions raised in the rebalance listener, so we need to store - // exceptions and rethrow when poll() returns. - rebalanceException = e; - } + try { + closePartitions(); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; } // Make sure we don't have any leftover data since offsets will be reset to committed positions diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index a3cd5766ab075..1f5647b1bed0e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -123,6 +123,7 @@ public void setUp() { @Test public void testPollRedelivery() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime expectConsumerPoll(1); @@ -151,7 +152,8 @@ public void testPollRedelivery() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); workerTask.poll(Long.MAX_VALUE); workerTask.poll(Long.MAX_VALUE); @@ -163,12 +165,14 @@ public void testErrorInRebalancePartitionRevocation() throws Exception { RuntimeException exception = new RuntimeException("Revocation error"); expectInitializeTask(); + expectPollInitialAssignment(); expectRebalanceRevocationError(exception); PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); try { workerTask.poll(Long.MAX_VALUE); fail("Poll should have raised the rebalance exception"); @@ -184,12 +188,14 @@ public void testErrorInRebalancePartitionAssignment() throws Exception { RuntimeException exception = new RuntimeException("Assignment error"); expectInitializeTask(); + expectPollInitialAssignment(); expectRebalanceAssignmentError(exception); PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); try { workerTask.poll(Long.MAX_VALUE); fail("Poll should have raised the rebalance exception"); @@ -206,16 +212,6 @@ private void expectInitializeTask() throws Exception { consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { - @Override - public ConsumerRecords answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - return ConsumerRecords.empty(); - } - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); sinkTask.start(TASK_PROPS); @@ -267,6 +263,23 @@ public ConsumerRecords answer() throws Throwable { }); } + private void expectPollInitialAssignment() { + final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.onPartitionsAssigned(partitions); + EasyMock.expectLastCall(); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { + @Override + public ConsumerRecords answer() throws Throwable { + rebalanceListener.getValue().onPartitionsAssigned(partitions); + return ConsumerRecords.empty(); + } + }); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + } + private void expectConsumerPoll(final int numMessages) { EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer>() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 05d7dadd06ed2..411f792232a7c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -55,6 +55,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -126,13 +127,20 @@ public void setup() { @Test public void testPollsInBackground() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture> capturedRecords = expectPolls(1L); expectStopTask(10L); PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); + + // First iteration initializes partition assignment + workerTask.iteration(); + + // Then we iterate to fetch data for (int i = 0; i < 10; i++) { workerTask.iteration(); } @@ -160,6 +168,8 @@ public void testPollsInBackground() throws Exception { @Test public void testCommit() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + // Make each poll() take the offset commit interval Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); @@ -169,11 +179,15 @@ public void testCommit() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // First iteration gets one record + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch one record workerTask.iteration(); - // Second triggers commit, gets a second offset + // Trigger the commit workerTask.iteration(); + // Commit finishes synchronously for testing so we can check this immediately assertEquals(0, workerTask.commitFailures()); workerTask.stop(); @@ -188,6 +202,8 @@ public void testCommit() throws Exception { @Test public void testCommitTaskFlushFailure() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, new RuntimeException(), null, 0, true); // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization @@ -203,10 +219,15 @@ public void testCommitTaskFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit + workerTask.initializeAndStart(); + + // Initialize partition assignment workerTask.iteration(); + // Fetch some data workerTask.iteration(); + // Trigger the commit + workerTask.iteration(); + assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); @@ -221,6 +242,7 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { // Validate that we rewind to the correct offsets if a task's flush method throws an exception expectInitializeTask(); + expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, null, 0, true); expectOffsetFlush(2L, new RuntimeException(), null, 0, true); @@ -236,11 +258,17 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers first commit, third iteration triggers second (failing) commit + workerTask.initializeAndStart(); + + // Initialize partition assignment workerTask.iteration(); + // Fetch some data workerTask.iteration(); + // Trigger first commit, workerTask.iteration(); + // Trigger second (failing) commit + workerTask.iteration(); + assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); @@ -253,6 +281,8 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { @Test public void testCommitConsumerFailure() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, new Exception(), 0, true); @@ -261,10 +291,15 @@ public void testCommitConsumerFailure() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data workerTask.iteration(); + // Trigger commit workerTask.iteration(); + // TODO Response to consistent failures? assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); @@ -278,6 +313,8 @@ public void testCommitConsumerFailure() throws Exception { @Test public void testCommitTimeout() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); @@ -287,13 +324,18 @@ public void testCommitTimeout() throws Exception { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't - // trigger another commit + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data workerTask.iteration(); workerTask.iteration(); + // Trigger the commit workerTask.iteration(); + // Trigger the timeout without another commit workerTask.iteration(); + // TODO Response to consistent failures? assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); @@ -362,7 +404,7 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); workerTask.iteration(); workerTask.iteration(); workerTask.iteration(); @@ -376,6 +418,8 @@ public Object answer() throws Throwable { @Test public void testRewind() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + final long startOffset = 40L; final Map offsets = new HashMap<>(); @@ -404,7 +448,8 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.initialize(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initializeAndStart(); + workerTask.iteration(); workerTask.iteration(); workerTask.iteration(); workerTask.stop(); @@ -420,26 +465,31 @@ private void expectInitializeTask() throws Exception { consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); + PowerMock.expectLastCall(); + sinkTask.start(TASK_PROPS); + PowerMock.expectLastCall(); + } + + private void expectPollInitialAssignment() throws Exception { + final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3); + + sinkTask.onPartitionsAssigned(partitions); + EasyMock.expectLastCall(); + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + rebalanceListener.getValue().onPartitionsAssigned(partitions); return ConsumerRecords.empty(); } }); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); } private void expectStopTask(final long expectedMessages) throws Exception { - final long finalOffset = FIRST_OFFSET + expectedMessages - 1; - sinkTask.stop(); PowerMock.expectLastCall(); From dc002cc43eacade42e07e78d0b297d31bf548cad Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 26 Jan 2016 16:09:47 -0800 Subject: [PATCH 4/9] Add open/close methods to SinkTask and deprecate onPartitionsAssigned/onPartitionsRevoked --- .../apache/kafka/connect/sink/SinkTask.java | 66 +++++++++++++++---- .../kafka/connect/runtime/WorkerSinkTask.java | 4 +- .../connect/runtime/WorkerSinkTaskTest.java | 8 +-- .../runtime/WorkerSinkTaskThreadedTest.java | 2 +- 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 85ce88acada8a..1110893210fb9 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -25,9 +25,34 @@ import java.util.Map; /** - * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In - * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush} - * to support offset commits. + * SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task + * instance is assigned a set of partitions by the Connect framework and will handle all records received + * from those partitions. As records are fetched, they will be written to the downstream system using + * {@link #put(Collection)}. At a minimum, you must implement this interface along with {@link #flush(Map)}, + * which is used to commit the offsets of records forwarded. Generally, after records have been flushed, + * they will not be processed again except in the case of a failure. To avoid message loss, you must ensure + * that flushed records have been successfully written to the downstream system. + * + * Below we describe the lifecycle of a SinkTask. + * + *

    + *
  1. Initialization: SinkTasks are first initialized using {@link #initialize(SinkTaskContext)} + * to prepare the task's context and {@link #start(Map)} to accept configuration and start any services + * needed for processing.
  2. + *
  3. Partition Assignment: After initialization, Connect will assign the task a set of partitions + * using {@link #open(Collection)}. These partitions are owned exclusively by this task until they + * have been closed with {@link #close(Collection)}.
  4. + *
  5. Record Processing: Once partitions have been opened for writing, Connect will begin forwarding + * records from Kafka using the {@link #put(Collection)} API. Periodically, Connect will ask the task + * to flush records using {@link #flush(Map)}. All records with offsets , are considered "committed" + * after flushing
  6. + *
  7. Partition Rebalancing: Occasionally, Connect will need to change the assignment of this task. + * When this happens, the currently assigned partitions will be closed with {@link #close(Collection)} and + * the new assignment will be opened using {@link #open(Collection)}.
  8. + *
  9. Shutdown: When the task needs to be shutdown, Connect will close active partitions (if there + * are any) and stop the task using {@link #stop()}
  10. + *
+ * */ @InterfaceStability.Unstable public abstract class SinkTask implements Task { @@ -42,6 +67,11 @@ public abstract class SinkTask implements Task { protected SinkTaskContext context; + /** + * Initialize the context of this task. Note that the partition assignment will be empty until + * Connect has opened the partitions for writing with {@link #open(Collection)}. + * @param context The sink task's context + */ public void initialize(SinkTaskContext context) { this.context = context; } @@ -77,24 +107,38 @@ public void initialize(SinkTaskContext context) { /** * The SinkTask use this method to create writers for newly assigned partitions in case of partition - * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. - * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. - * This method will be called after partition re-assignment completes and before the SinkTask starts + * rebalance. This method will be called after partition re-assignment completes and before the SinkTask starts * fetching data. Note that any errors raised from this method will cause the task to stop. * @param partitions The list of partitions that are now assigned to the task (may include * partitions previously assigned to the task) */ + public void open(Collection partitions) { + this.onPartitionsAssigned(partitions); + } + + /** + * @deprecated Use {@link #open(Collection)} for partition initialization. + */ + @Deprecated public void onPartitionsAssigned(Collection partitions) { } /** - * The SinkTask use this method to close writers and commit offsets for partitions that are no + * The SinkTask use this method to close writers for partitions that are no * longer assigned to the SinkTask. This method will be called before a rebalance operation starts - * and after the SinkTask stops fetching data. Note that any errors raised from this method will cause - * the task to stop. - * @param partitions The list of partitions that were assigned to the consumer on the last - * rebalance + * and after the SinkTask stops fetching data. After being closed, Connect will not write + * any records to the task until a new set of partitions has been opened. Note that any errors raised + * from this method will cause the task to stop. + * @param partitions The list of partitions that should be closed + */ + public void close(Collection partitions) { + this.onPartitionsRevoked(partitions); + } + + /** + * @deprecated Use {@link #close(Collection)} instead for partition cleanup. */ + @Deprecated public void onPartitionsRevoked(Collection partitions) { } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2591bc025e52b..282e5b29bfaa2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -380,7 +380,7 @@ private void openPartitions(Collection partitions) { if (partitions.isEmpty()) return; - task.onPartitionsAssigned(partitions); + task.open(partitions); } private void closePartitions() { @@ -393,7 +393,7 @@ private void closePartitions() { } finally { // Some sink implementations may not actually flush the data until close is called, // so the commit should follow it. - task.onPartitionsRevoked(currentOffsets.keySet()); + task.close(currentOffsets.keySet()); } commitOffsets(offsets, true, -1); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 1f5647b1bed0e..e72bb2d47b03c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -221,7 +221,7 @@ private void expectInitializeTask() throws Exception { private void expectRebalanceRevocationError(RuntimeException e) { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(new HashSet<>(partitions)); + sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( @@ -237,7 +237,7 @@ public ConsumerRecords answer() throws Throwable { private void expectRebalanceAssignmentError(RuntimeException e) { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(new HashSet<>(partitions)); + sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall(); sinkTask.flush(EasyMock.>anyObject()); @@ -249,7 +249,7 @@ private void expectRebalanceAssignmentError(RuntimeException e) { EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - sinkTask.onPartitionsAssigned(partitions); + sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( @@ -266,7 +266,7 @@ public ConsumerRecords answer() throws Throwable { private void expectPollInitialAssignment() { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsAssigned(partitions); + sinkTask.open(partitions); EasyMock.expectLastCall(); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 411f792232a7c..412555735471d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -474,7 +474,7 @@ private void expectInitializeTask() throws Exception { private void expectPollInitialAssignment() throws Exception { final List partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3); - sinkTask.onPartitionsAssigned(partitions); + sinkTask.open(partitions); EasyMock.expectLastCall(); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { From dd43c05d1ff2ae0b064926b35bba33a8d5143e1a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 26 Jan 2016 17:19:44 -0800 Subject: [PATCH 5/9] ensure SinkTask.put is called after every poll --- .../java/org/apache/kafka/connect/runtime/WorkerSinkTask.java | 3 --- .../org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java | 3 +++ .../kafka/connect/runtime/WorkerSinkTaskThreadedTest.java | 3 +++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 282e5b29bfaa2..ee32b2bba05a9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -325,9 +325,6 @@ private void convertMessages(ConsumerRecords msgs) { } private void deliverMessages() { - if (messageBatch.isEmpty()) - return; - // Finally, deliver this batch to the sink try { // Since we reuse the messageBatch buffer, ensure we give the task its own copy diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index e72bb2d47b03c..04b08b379d931 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -278,6 +278,9 @@ public ConsumerRecords answer() throws Throwable { }); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + + sinkTask.put(Collections.emptyList()); + EasyMock.expectLastCall(); } private void expectConsumerPoll(final int numMessages) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 412555735471d..3bf653e983136 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -487,6 +487,9 @@ public ConsumerRecords answer() throws Throwable { EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); + + sinkTask.put(Collections.emptyList()); + EasyMock.expectLastCall(); } private void expectStopTask(final long expectedMessages) throws Exception { From 0940e31fe783fa90d5e02fa26242280410b6102a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Jan 2016 10:48:43 -0800 Subject: [PATCH 6/9] consolidate commit handling for consistency --- .../kafka/connect/runtime/WorkerSinkTask.java | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index ee32b2bba05a9..944f8e5480635 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -140,10 +140,7 @@ protected void iteration() { // Maybe commit if (!committing && now >= nextCommit) { - committing = true; - commitSeqno += 1; - commitStarted = now; - flushAndCommitOffsets(false, commitSeqno); + commitOffsets(now, false); nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); } @@ -222,11 +219,11 @@ protected void poll(long timeoutMs) { /** * Starts an offset commit by flushing outstanding messages from the task and then starting - * the write commit. This should only be invoked by the WorkerSinkTaskThread. + * the write commit. **/ - private void commitOffsets(Map offsets, boolean sync, final int seqno) { + private void doCommit(Map offsets, boolean closing, final int seqno) { log.info("{} Committing offsets", this); - if (sync) { + if (closing) { try { consumer.commitSync(offsets); lastCommittedOffsets = offsets; @@ -246,7 +243,14 @@ public void onComplete(Map offsets, Exception } } - private void flushAndCommitOffsets(boolean sync, int seqno) { + private void commitOffsets(long now, boolean closing) { + if (currentOffsets.isEmpty()) + return; + + committing = true; + commitSeqno += 1; + commitStarted = now; + Map offsets = new HashMap<>(currentOffsets); try { task.flush(offsets); @@ -258,13 +262,19 @@ private void flushAndCommitOffsets(boolean sync, int seqno) { consumer.seek(entry.getKey(), entry.getValue().offset()); } currentOffsets = new HashMap<>(lastCommittedOffsets); - onCommitCompleted(t, seqno); + onCommitCompleted(t, commitSeqno); return; + } finally { + // Close the task if needed before committing the offsets. This is basically the last chance for + // the connector to actually flush data that has been written to it. + if (closing) + task.close(currentOffsets.keySet()); } - commitOffsets(offsets, sync, seqno); + doCommit(offsets, closing, commitSeqno); } + @Override public String toString() { return "WorkerSinkTask{" + @@ -381,19 +391,7 @@ private void openPartitions(Collection partitions) { } private void closePartitions() { - if (currentOffsets.isEmpty()) - return; - - Map offsets = new HashMap<>(currentOffsets); - try { - task.flush(offsets); - } finally { - // Some sink implementations may not actually flush the data until close is called, - // so the commit should follow it. - task.close(currentOffsets.keySet()); - } - - commitOffsets(offsets, true, -1); + commitOffsets(time.milliseconds(), true); } private class HandleRebalance implements ConsumerRebalanceListener { From 5b303dadeb1e5fea9ec30112830051760fdd914f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Jan 2016 11:10:12 -0800 Subject: [PATCH 7/9] merge AbstractWorkerTask into WorkerTask --- .../connect/runtime/AbstractWorkerTask.java | 92 ------------------- .../apache/kafka/connect/runtime/Worker.java | 2 +- .../kafka/connect/runtime/WorkerSinkTask.java | 2 +- .../connect/runtime/WorkerSourceTask.java | 2 +- .../kafka/connect/runtime/WorkerTask.java | 73 ++++++++++++++- 5 files changed, 72 insertions(+), 99 deletions(-) delete mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java deleted file mode 100644 index ef9b66ebf1966..0000000000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerTask.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.connect.runtime; - -import org.apache.kafka.connect.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -abstract class AbstractWorkerTask implements WorkerTask { - private static final Logger log = LoggerFactory.getLogger(AbstractWorkerTask.class); - - protected final ConnectorTaskId id; - private final AtomicBoolean stopping; - private final AtomicBoolean running; - private final CountDownLatch shutdownLatch; - - public AbstractWorkerTask(ConnectorTaskId id) { - this.id = id; - this.stopping = new AtomicBoolean(false); - this.running = new AtomicBoolean(false); - this.shutdownLatch = new CountDownLatch(1); - } - - protected abstract void execute(); - - protected abstract void close(); - - protected boolean isStopping() { - return stopping.get(); - } - - private void doClose() { - try { - close(); - } catch (Throwable t) { - log.error("Unhandled exception in task shutdown {}", id, t); - } finally { - running.set(false); - shutdownLatch.countDown(); - } - } - - @Override - public void run() { - if (!this.running.compareAndSet(false, true)) - return; - - try { - execute(); - } catch (Throwable t) { - log.error("Unhandled exception in task {}", id, t); - } finally { - doClose(); - } - } - - @Override - public void stop() { - this.stopping.set(true); - } - - @Override - public boolean awaitStop(long timeoutMs) { - if (!running.get()) - return true; - - try { - return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - return false; - } - } - -} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c7ff8554e639c..0a4bb7fd9721a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -328,7 +328,7 @@ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); // Decide which type of worker task we need based on the type of task. - final AbstractWorkerTask workerTask; + final WorkerTask workerTask; if (task instanceof SourceTask) { SourceTask sourceTask = (SourceTask) task; OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 944f8e5480635..a84ee92b36619 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -52,7 +52,7 @@ /** * WorkerTask that uses a SinkTask to export data from Kafka. */ -class WorkerSinkTask extends AbstractWorkerTask { +class WorkerSinkTask extends WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); private final WorkerConfig workerConfig; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 78419a805ea09..30c22620f14ad 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -46,7 +46,7 @@ /** * WorkerTask that uses a SourceTask to ingest data into Kafka. */ -class WorkerSourceTask extends AbstractWorkerTask { +class WorkerSourceTask extends WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); private static final long SEND_FAILED_BACKOFF_MS = 100; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index c5e74ebc884cd..7c16250924575 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -17,25 +17,48 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Handles processing for an individual task. This interface only provides the basic methods * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with * Kafka to create a data flow. */ -interface WorkerTask extends Runnable { +abstract class WorkerTask implements Runnable { + private static final Logger log = LoggerFactory.getLogger(WorkerTask.class); + + protected final ConnectorTaskId id; + private final AtomicBoolean stopping; + private final AtomicBoolean running; + private final CountDownLatch shutdownLatch; + + public WorkerTask(ConnectorTaskId id) { + this.id = id; + this.stopping = new AtomicBoolean(false); + this.running = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + /** * Initialize the task for execution. * @param props initial configuration */ - void initialize(Map props); + public abstract void initialize(Map props); /** * Stop this task from processing messages. This method does not block, it only triggers * shutdown. Use #{@link #awaitStop} to block until completion. */ - void stop(); + public void stop() { + this.stopping.set(true); + } /** * Wait for this task to finish stopping. @@ -43,6 +66,48 @@ interface WorkerTask extends Runnable { * @param timeoutMs * @return true if successful, false if the timeout was reached */ - boolean awaitStop(long timeoutMs); + public boolean awaitStop(long timeoutMs) { + if (!running.get()) + return true; + + try { + return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + + protected abstract void execute(); + + protected abstract void close(); + + protected boolean isStopping() { + return stopping.get(); + } + + private void doClose() { + try { + close(); + } catch (Throwable t) { + log.error("Unhandled exception in task shutdown {}", id, t); + } finally { + running.set(false); + shutdownLatch.countDown(); + } + } + + @Override + public void run() { + if (!this.running.compareAndSet(false, true)) + return; + + try { + execute(); + } catch (Throwable t) { + log.error("Unhandled exception in task {}", id, t); + } finally { + doClose(); + } + } } From 0c8c0664bc1cbb64a8be2ceb94a38fd411643e7f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Jan 2016 11:26:30 -0800 Subject: [PATCH 8/9] fix SinkTask javadoc --- .../java/org/apache/kafka/connect/sink/SinkTask.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 1110893210fb9..3d0becc5f7a12 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -27,11 +27,10 @@ /** * SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task * instance is assigned a set of partitions by the Connect framework and will handle all records received - * from those partitions. As records are fetched, they will be written to the downstream system using - * {@link #put(Collection)}. At a minimum, you must implement this interface along with {@link #flush(Map)}, - * which is used to commit the offsets of records forwarded. Generally, after records have been flushed, - * they will not be processed again except in the case of a failure. To avoid message loss, you must ensure - * that flushed records have been successfully written to the downstream system. + * from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the + * {@link #put(Collection)} API, which should either write them to the downstream system or batch them for + * later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are + * actually pushed to the downstream system.. * * Below we describe the lifecycle of a SinkTask. * @@ -44,8 +43,7 @@ * have been closed with {@link #close(Collection)}. *

  • Record Processing: Once partitions have been opened for writing, Connect will begin forwarding * records from Kafka using the {@link #put(Collection)} API. Periodically, Connect will ask the task - * to flush records using {@link #flush(Map)}. All records with offsets , are considered "committed" - * after flushing
  • + * to flush records using {@link #flush(Map)} as described above. *
  • Partition Rebalancing: Occasionally, Connect will need to change the assignment of this task. * When this happens, the currently assigned partitions will be closed with {@link #close(Collection)} and * the new assignment will be opened using {@link #open(Collection)}.
  • From 467e6ac502222843a057808b7e97fe113db488d3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Jan 2016 11:29:04 -0800 Subject: [PATCH 9/9] minor fixes --- .../java/org/apache/kafka/connect/runtime/WorkerSinkTask.java | 2 +- .../main/java/org/apache/kafka/connect/runtime/WorkerTask.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index a84ee92b36619..8c5bd9fddca3f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -361,7 +361,7 @@ private void deliverMessages() { // Let this exit normally, the batch will be reprocessed on the next loop. } catch (Throwable t) { log.error("Task {} threw an uncaught and unrecoverable exception", id, t); - log.error("Task is being killed and will not recover until manually restarted:"); + log.error("Task is being killed and will not recover until manually restarted"); throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception."); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 7c16250924575..b4d427a545841 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -99,7 +99,7 @@ private void doClose() { @Override public void run() { if (!this.running.compareAndSet(false, true)) - return; + throw new IllegalStateException("The task cannot be started while still running"); try { execute();