From 35aa7b69e06d652642f6a501bfb2234fa7e5fa6c Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 20 Oct 2015 18:00:26 -0700 Subject: [PATCH 1/6] Add backoff timeout and support rewinds --- .../kafka/copycat/sink/SinkTaskContext.java | 38 +++++++++++ .../kafka/copycat/runtime/WorkerSinkTask.java | 18 +++++ .../copycat/runtime/WorkerSinkTaskTest.java | 68 ++++++++++++++++++- 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index 3ecff2708fadd..a8b58d8119620 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,9 +31,12 @@ @InterfaceStability.Unstable public abstract class SinkTaskContext { private Map offsets; + private long backoffMs = -1L; + private Set rewinds; public SinkTaskContext() { offsets = new HashMap<>(); + rewinds = new HashSet<>(); } /** @@ -58,6 +62,40 @@ public Map offsets() { return offsets; } + /** + * Set the backoff timeout. SinkTasks should use this to indicate that they need to retry certain + * operations after the backoff timeout. + * @param backoffMs the backoff timeout in milliseconds. + */ + public void backoffMs(long backoffMs) { + this.backoffMs = backoffMs; + } + + /** + * Get the backoff in milliseconds set by SinkTasks. Used by the Copycat framework. + * @return the backoff timeout in milliseconds. + */ + public long backoffMs() { + return backoffMs; + } + + /** + * Set the topics that needs to rewind. SinkTask should use this to indicate that some topic + * partitions needs to rewind. + * @param rewinds The set of topic partitions that needs to rewind. + */ + public void rewinds(Set rewinds) { + this.rewinds = rewinds; + } + + /** + * Get the topic partitions that needs to rewind. + * @return the set of topic partitions that needs to rewind. + */ + public Set rewinds() { + return rewinds; + } + /** * Get the current set of assigned TopicPartitions for this task. * @return the set of currently assigned TopicPartitions diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 70b99d04653cb..51511a4d18dec 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -108,6 +108,9 @@ public void close() { /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ public void poll(long timeoutMs) { try { + rewind(); + long backOffMs = context.backoffMs(); + timeoutMs = Math.max(timeoutMs, backOffMs); log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); ConsumerRecords msgs = consumer.poll(timeoutMs); log.trace("{} polling returned {} messages", id, msgs.count()); @@ -236,6 +239,21 @@ private void deliverMessages(ConsumerRecords msgs) { } } + private void rewind() { + Set rewinds = context.rewinds(); + if (rewinds.isEmpty()) { + return; + } + Map offsets = context.offsets(); + for (TopicPartition tp: rewinds) { + Long offset = offsets.get(tp); + if (offset != null) { + log.trace("Rewind {} to offset {}.", tp, offset); + consumer.seek(tp, offset); + } + } + rewinds.clear(); + } private class WorkerSinkTaskContext extends SinkTaskContext { @Override diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index e5e5b855f41a4..29e5aadd765e0 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -17,9 +17,14 @@ package org.apache.kafka.copycat.runtime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; @@ -31,7 +36,11 @@ import org.apache.kafka.copycat.util.ConnectorTaskId; import org.apache.kafka.copycat.util.MockTime; import org.apache.kafka.copycat.util.ThreadedTest; -import org.easymock.*; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -47,6 +56,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -349,6 +359,58 @@ public Object answer() throws Throwable { PowerMock.verifyAll(); } + @Test + public void testRewind() throws Exception { + Properties taskProps = new Properties(); + expectInitializeTask(taskProps); + final long startOffset = 40L; + + expectOnePoll().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + sinkTaskContext.getValue().rewinds(new HashSet<>(Arrays.asList(TOPIC_PARTITION))); + sinkTaskContext.getValue().offset( + Collections.singletonMap(TOPIC_PARTITION, startOffset)); + return null; + } + }); + + expectOnePoll().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + Set rewinds = sinkTaskContext.getValue().rewinds(); + assertEquals(1, rewinds.size()); + assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION)), rewinds); + return null; + } + }); + consumer.seek(TOPIC_PARTITION, startOffset); + EasyMock.expectLastCall(); + + + expectOnePoll().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + Set rewinds = sinkTaskContext.getValue().rewinds(); + assertEquals(0, rewinds.size()); + return null; + } + }); + + expectStopTask(3); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + workerTask.stop(); + // No need for awaitStop since the thread is mocked + workerTask.close(); + + PowerMock.verifyAll(); + } private void expectInitializeTask(Properties taskProps) throws Exception { sinkTask.initialize(EasyMock.capture(sinkTaskContext)); From 48efb4de875f4e9975d05c07d1993b50be9a29f3 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 29 Oct 2015 13:00:12 -0700 Subject: [PATCH 2/6] Address review comments --- .../apache/kafka/copycat/sink/SinkTask.java | 7 ++++++ .../kafka/copycat/sink/SinkTaskContext.java | 24 ++---------------- .../kafka/copycat/runtime/WorkerSinkTask.java | 25 +++++++++++++------ .../copycat/runtime/WorkerSinkTaskTest.java | 24 +++++------------- 4 files changed, 33 insertions(+), 47 deletions(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index bf5d152100d04..499ba3b16842a 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -62,4 +62,11 @@ public void initialize(SinkTaskContext context) { * @param offsets mapping of TopicPartition to committed offset */ public abstract void flush(Map offsets); + + + public void onPartitionsAssigned(Collection partitions) { + } + + public void onPartitionsRevoked(Collection partitions) { + } } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index a8b58d8119620..d5f72f749fe9f 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -32,11 +31,9 @@ public abstract class SinkTaskContext { private Map offsets; private long backoffMs = -1L; - private Set rewinds; public SinkTaskContext() { offsets = new HashMap<>(); - rewinds = new HashSet<>(); } /** @@ -67,7 +64,7 @@ public Map offsets() { * operations after the backoff timeout. * @param backoffMs the backoff timeout in milliseconds. */ - public void backoffMs(long backoffMs) { + public void backoff(long backoffMs) { this.backoffMs = backoffMs; } @@ -75,27 +72,10 @@ public void backoffMs(long backoffMs) { * Get the backoff in milliseconds set by SinkTasks. Used by the Copycat framework. * @return the backoff timeout in milliseconds. */ - public long backoffMs() { + public long backoff() { return backoffMs; } - /** - * Set the topics that needs to rewind. SinkTask should use this to indicate that some topic - * partitions needs to rewind. - * @param rewinds The set of topic partitions that needs to rewind. - */ - public void rewinds(Set rewinds) { - this.rewinds = rewinds; - } - - /** - * Get the topic partitions that needs to rewind. - * @return the set of topic partitions that needs to rewind. - */ - public Set rewinds() { - return rewinds; - } - /** * Get the current set of assigned TopicPartitions for this task. * @return the set of currently assigned TopicPartitions diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 51511a4d18dec..c5d32d1370ede 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -109,7 +109,7 @@ public void close() { public void poll(long timeoutMs) { try { rewind(); - long backOffMs = context.backoffMs(); + long backOffMs = context.backoff(); timeoutMs = Math.max(timeoutMs, backOffMs); log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); ConsumerRecords msgs = consumer.poll(timeoutMs); @@ -192,7 +192,7 @@ private KafkaConsumer createConsumer(Properties taskProps) { } log.debug("Task {} subscribing to topics {}", id, topics); - newConsumer.subscribe(Arrays.asList(topics)); + newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance()); // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to // enable exactly once delivery to that system). @@ -240,19 +240,18 @@ private void deliverMessages(ConsumerRecords msgs) { } private void rewind() { - Set rewinds = context.rewinds(); - if (rewinds.isEmpty()) { + Map offsets = context.offsets(); + if (offsets.isEmpty()) { return; } - Map offsets = context.offsets(); - for (TopicPartition tp: rewinds) { + for (TopicPartition tp: offsets.keySet()) { Long offset = offsets.get(tp); if (offset != null) { log.trace("Rewind {} to offset {}.", tp, offset); consumer.seek(tp, offset); } } - rewinds.clear(); + offsets.clear(); } private class WorkerSinkTaskContext extends SinkTaskContext { @@ -285,4 +284,16 @@ public void resume(TopicPartition... partitions) { } } } + + private class HandleRebalance implements ConsumerRebalanceListener { + @Override + public void onPartitionsAssigned(Collection partitions) { + task.onPartitionsAssigned(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + task.onPartitionsRevoked(partitions); + } + } } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 29e5aadd765e0..75d09dd450128 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; @@ -53,10 +52,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; -import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -364,35 +363,25 @@ public void testRewind() throws Exception { Properties taskProps = new Properties(); expectInitializeTask(taskProps); final long startOffset = 40L; + final Map offsets = new HashMap<>(); expectOnePoll().andAnswer(new IAnswer() { @Override public Object answer() throws Throwable { - sinkTaskContext.getValue().rewinds(new HashSet<>(Arrays.asList(TOPIC_PARTITION))); - sinkTaskContext.getValue().offset( - Collections.singletonMap(TOPIC_PARTITION, startOffset)); + offsets.put(TOPIC_PARTITION, startOffset); + sinkTaskContext.getValue().offset(offsets); return null; } }); - expectOnePoll().andAnswer(new IAnswer() { - @Override - public Object answer() throws Throwable { - Set rewinds = sinkTaskContext.getValue().rewinds(); - assertEquals(1, rewinds.size()); - assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION)), rewinds); - return null; - } - }); consumer.seek(TOPIC_PARTITION, startOffset); EasyMock.expectLastCall(); - expectOnePoll().andAnswer(new IAnswer() { @Override public Object answer() throws Throwable { - Set rewinds = sinkTaskContext.getValue().rewinds(); - assertEquals(0, rewinds.size()); + Map offsets = sinkTaskContext.getValue().offsets(); + assertEquals(0, offsets.size()); return null; } }); @@ -404,7 +393,6 @@ public Object answer() throws Throwable { workerTask.start(taskProps); workerThread.iteration(); workerThread.iteration(); - workerThread.iteration(); workerTask.stop(); // No need for awaitStop since the thread is mocked workerTask.close(); From f8e193f3f092bfc6b4231ac37d82bde07fe1cd3f Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Mon, 2 Nov 2015 15:13:41 -0800 Subject: [PATCH 3/6] Address review comments --- .../org/apache/kafka/copycat/runtime/WorkerSinkTask.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index c5d32d1370ede..5b751c125a1d1 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -109,8 +109,11 @@ public void close() { public void poll(long timeoutMs) { try { rewind(); - long backOffMs = context.backoff(); - timeoutMs = Math.max(timeoutMs, backOffMs); + long backoff = context.backoff(); + if (backoff > 0) { + timeoutMs = Math.min(timeoutMs, backoff); + context.backoff(-1L); + } log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); ConsumerRecords msgs = consumer.poll(timeoutMs); log.trace("{} polling returned {} messages", id, msgs.count()); From 6fefacbdf155deff1df64a468d51cfd23fc01f70 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Mon, 2 Nov 2015 16:46:45 -0800 Subject: [PATCH 4/6] Address review comments and add necessary java docs --- .../org/apache/kafka/copycat/sink/SinkTask.java | 17 ++++++++++++++++- .../kafka/copycat/sink/SinkTaskContext.java | 6 ++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index 499ba3b16842a..c6cd12f67eb5a 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -63,10 +63,25 @@ public void initialize(SinkTaskContext context) { */ public abstract void flush(Map offsets); - + /** + * The SinkTask use this method to create writers for newly assigned partitions in case of partition + * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. + * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. + * This method will be called after partition re-assignment completes and before the SinkTask starts + * fetching data. + * @param partitions The list of partitions that are now assigned to the task (may include + * partitions previously assigned to the task) + */ public void onPartitionsAssigned(Collection partitions) { } + /** + * The SinkTask use this method to close writers and commit offsets for partitions that are + * longer assigned to the SinkTask. This method will be called before a rebalance operation starts + * and after the SinkTask stops fetching data. + * @param partitions The list of partitions that were assigned to the consumer on the last + * rebalance + */ public void onPartitionsRevoked(Collection partitions) { } } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index d5f72f749fe9f..f721cca19d083 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -60,8 +60,10 @@ public Map offsets() { } /** - * Set the backoff timeout. SinkTasks should use this to indicate that they need to retry certain - * operations after the backoff timeout. + * Set the backoff timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain + * operations after the backoff timeout. SinkTasks may have certain operations on external systems that may need + * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network + * issues. SinkTasks use this method to set how long to wait before retrying. * @param backoffMs the backoff timeout in milliseconds. */ public void backoff(long backoffMs) { From 1c110dafe9d5b49bb9bb579da972e3eea4435c88 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Mon, 2 Nov 2015 18:55:14 -0800 Subject: [PATCH 5/6] Move some methods from SinkTaskContext to WorkerSinkTaskContext --- .../kafka/copycat/sink/SinkTaskContext.java | 36 ++++----- .../kafka/copycat/runtime/WorkerSinkTask.java | 55 ++++--------- .../runtime/WorkerSinkTaskContext.java | 77 +++++++++++++++++++ .../copycat/runtime/WorkerSinkTaskTest.java | 3 +- 4 files changed, 112 insertions(+), 59 deletions(-) create mode 100644 copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index f721cca19d083..59bc2bddc5d15 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -29,19 +29,18 @@ */ @InterfaceStability.Unstable public abstract class SinkTaskContext { - private Map offsets; - private long backoffMs = -1L; + protected Map offsets; + protected long backoffMs = -1L; public SinkTaskContext() { offsets = new HashMap<>(); } /** - * Reset the consumer offsets for the given topic partitions. SinkTasks should use this when they are started - * if they manage offsets in the sink data store rather than using Kafka consumer offsets. For example, an HDFS - * connector might record offsets in HDFS to provide exactly once delivery. When the SinkTask is started or - * a rebalance occurs, the task would reload offsets from HDFS and use this method to reset the consumer to those - * offsets. + * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets + * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record + * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task + * would reload offsets from HDFS and use this method to reset the consumer to those offsets. * * SinkTasks that do not manage their own offsets do not need to use this method. * @@ -52,11 +51,18 @@ public void offset(Map offsets) { } /** - * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework. - * @return the map of offsets + * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets + * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record + * offsets in HDFS to provide exactly once delivery. When the topic partition is recovered the task + * would reload offsets from HDFS and use this method to reset the consumer to the offset. + * + * SinkTasks that do not manage their own offsets do not need to use this method. + * + * @param tp the topic partition to reset offset. + * @param offset the offset to reset to. */ - public Map offsets() { - return offsets; + public void offset(TopicPartition tp, long offset) { + offsets.put(tp, offset); } /** @@ -70,14 +76,6 @@ public void backoff(long backoffMs) { this.backoffMs = backoffMs; } - /** - * Get the backoff in milliseconds set by SinkTasks. Used by the Copycat framework. - * @return the backoff timeout in milliseconds. - */ - public long backoff() { - return backoffMs; - } - /** * Get the current set of assigned TopicPartitions for this task. * @return the set of currently assigned TopicPartitions diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 5b751c125a1d1..b75fa743efe0a 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -17,24 +17,34 @@ package org.apache.kafka.copycat.runtime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.IllegalWorkerStateException; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.sink.SinkTaskContext; import org.apache.kafka.copycat.storage.Converter; import org.apache.kafka.copycat.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; /** @@ -51,7 +61,7 @@ class WorkerSinkTask implements WorkerTask { private final Converter valueConverter; private WorkerSinkTaskThread workThread; private KafkaConsumer consumer; - private final SinkTaskContext context; + private WorkerSinkTaskContext context; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, Time time) { @@ -61,14 +71,14 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; - this.context = new WorkerSinkTaskContext(); } @Override public void start(Properties props) { + consumer = createConsumer(props); + context = new WorkerSinkTaskContext(consumer); task.initialize(context); task.start(props); - consumer = createConsumer(props); workThread = createWorkerThread(); workThread.start(); } @@ -257,37 +267,6 @@ private void rewind() { offsets.clear(); } - private class WorkerSinkTaskContext extends SinkTaskContext { - @Override - public Set assignment() { - if (consumer == null) - throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized"); - return consumer.assignment(); - } - - @Override - public void pause(TopicPartition... partitions) { - if (consumer == null) - throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized"); - try { - consumer.pause(partitions); - } catch (IllegalStateException e) { - throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); - } - } - - @Override - public void resume(TopicPartition... partitions) { - if (consumer == null) - throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); - try { - consumer.resume(partitions); - } catch (IllegalStateException e) { - throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); - } - } - } - private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java new file mode 100644 index 0000000000000..60c6cd840ef53 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.errors.IllegalWorkerStateException; +import org.apache.kafka.copycat.sink.SinkTaskContext; + +import java.util.Map; +import java.util.Set; + +public class WorkerSinkTaskContext extends SinkTaskContext { + + KafkaConsumer consumer; + + public WorkerSinkTaskContext(KafkaConsumer consumer) { + this.consumer = consumer; + } + + /** + * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework. + * @return the map of offsets + */ + public Map offsets() { + return offsets; + } + + /** + * Get the backoff in milliseconds set by SinkTasks. Used by the Copycat framework. + * @return the backoff timeout in milliseconds. + */ + public long backoff() { + return backoffMs; + } + + @Override + public Set assignment() { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized"); + } + return consumer.assignment(); + } + + @Override + public void pause(TopicPartition... partitions) { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized"); + } + try { + consumer.pause(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); + } + } + + @Override + public void resume(TopicPartition... partitions) { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); + } + try { + consumer.resume(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); + } + } +} diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 75d09dd450128..08707c944c2bb 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.sink.SinkTaskContext; import org.apache.kafka.copycat.storage.Converter; import org.apache.kafka.copycat.util.ConnectorTaskId; import org.apache.kafka.copycat.util.MockTime; @@ -87,7 +86,7 @@ public class WorkerSinkTaskTest extends ThreadedTest { private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private Time time; @Mock private SinkTask sinkTask; - private Capture sinkTaskContext = EasyMock.newCapture(); + private Capture sinkTaskContext = EasyMock.newCapture(); private WorkerConfig workerConfig; @Mock private Converter keyConverter; @Mock From 4a801add2f8f2f58381dc8ca7ef5e02d9bab00f6 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 3 Nov 2015 13:14:45 -0800 Subject: [PATCH 6/6] Change backoff to timeout --- .../apache/kafka/copycat/sink/SinkTaskContext.java | 12 ++++++------ .../apache/kafka/copycat/runtime/WorkerSinkTask.java | 8 ++++---- .../kafka/copycat/runtime/WorkerSinkTaskContext.java | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index 59bc2bddc5d15..399dcef7ac587 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -30,7 +30,7 @@ @InterfaceStability.Unstable public abstract class SinkTaskContext { protected Map offsets; - protected long backoffMs = -1L; + protected long timeoutMs = -1L; public SinkTaskContext() { offsets = new HashMap<>(); @@ -66,14 +66,14 @@ public void offset(TopicPartition tp, long offset) { } /** - * Set the backoff timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain - * operations after the backoff timeout. SinkTasks may have certain operations on external systems that may need + * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain + * operations after the timeout. SinkTasks may have certain operations on external systems that may need * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network * issues. SinkTasks use this method to set how long to wait before retrying. - * @param backoffMs the backoff timeout in milliseconds. + * @param timeoutMs the backoff timeout in milliseconds. */ - public void backoff(long backoffMs) { - this.backoffMs = backoffMs; + public void timeout(long timeoutMs) { + this.timeoutMs = timeoutMs; } /** diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index b75fa743efe0a..e9aa055a2a8db 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -119,10 +119,10 @@ public void close() { public void poll(long timeoutMs) { try { rewind(); - long backoff = context.backoff(); - if (backoff > 0) { - timeoutMs = Math.min(timeoutMs, backoff); - context.backoff(-1L); + long retryTimeout = context.timeout(); + if (retryTimeout > 0) { + timeoutMs = Math.min(timeoutMs, retryTimeout); + context.timeout(-1L); } log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); ConsumerRecords msgs = consumer.poll(timeoutMs); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java index 60c6cd840ef53..b8d7d54095e0b 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java @@ -36,11 +36,11 @@ public Map offsets() { } /** - * Get the backoff in milliseconds set by SinkTasks. Used by the Copycat framework. + * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework. * @return the backoff timeout in milliseconds. */ - public long backoff() { - return backoffMs; + public long timeout() { + return timeoutMs; } @Override