diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml
index fe763f35c8c0a..7bd9bcbf83e1f 100644
--- a/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -93,12 +93,7 @@ under the License.
-
- com.101tec
- zkclient
- 0.7
- jar
-
+
com.google.guava
@@ -113,6 +108,12 @@ under the License.
test
+
+ org.apache.flink
+ flink-shaded-curator-recipes
+ ${project.version}
+
+
org.apache.flinkflink-test-utils
@@ -122,15 +123,6 @@ under the License.
-
-
-
- com.101tec
- zkclient
- 0.7
-
-
-
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index b139e95351b0e..69ed9bfaa18ec 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -241,7 +241,7 @@ public enum FetcherType {
* Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
*
*
To determine which kink of fetcher and offset handler to use, please refer to the docs
- * at the beginnign of this class.
+ * at the beginning of this class.
*
* @param topic
* The Kafka topic to read from.
@@ -264,7 +264,7 @@ public FlinkKafkaConsumer(List topic, DeserializationSchema deseriali
* Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
*
*
To determine which kink of fetcher and offset handler to use, please refer to the docs
- * at the beginnign of this class.
+ * at the beginning of this class.
*
* @param topics
* The Kafka topics to read from.
@@ -522,7 +522,7 @@ public HashMap snapshotState(long checkpointId, long
}
// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
- //noinspection unchecked
+ @SuppressWarnings("unchecked")
HashMap currentOffsets = (HashMap) lastOffsets.clone();
// the map cannot be asynchronously updated, because only one checkpoint call can happen
@@ -570,7 +570,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
//noinspection unchecked
checkpointOffsets = (HashMap) pendingCheckpoints.remove(posInMap);
-
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
@@ -613,8 +612,10 @@ protected static List assignPartitions(List extends Thread {
+
private final long commitInterval;
private final FlinkKafkaConsumer consumer;
+
private volatile boolean running = true;
public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
@@ -625,13 +626,13 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consum
@Override
public void run() {
try {
-
while (running) {
try {
Thread.sleep(commitInterval);
// ------------ commit current offsets ----------------
// create copy of current offsets
+ //noinspection unchecked
HashMap currentOffsets = (HashMap) consumer.lastOffsets.clone();
commitOffsets(currentOffsets, this.consumer);
} catch (InterruptedException e) {
@@ -639,8 +640,6 @@ public void run() {
// throw unexpected interruption
throw e;
}
- // looks like the thread is being closed. Leave loop
- break;
}
}
} catch (Throwable t) {
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index f9b8448a4e547..f72117d444941 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -18,20 +18,18 @@
package org.apache.flink.streaming.connectors.kafka.internals;
-import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -42,16 +40,14 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-
-
- private final ZkClient zkClient;
-
+
private final String groupId;
-
+ private final CuratorFramework curatorClient;
+
+
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
@@ -61,30 +57,37 @@ public ZookeeperOffsetHandler(Properties props) {
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
+
+ // we use Curator's default timeouts
+ int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
+ int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
- zkClient = new ZkClient(zkConnect,
- Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
- Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
- new ZooKeeperStringSerializer());
+ // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
+ int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
+ int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+ curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+ curatorClient.start();
}
@Override
- public void commit(Map offsetsToCommit) {
+ public void commit(Map offsetsToCommit) throws Exception {
for (Map.Entry entry : offsetsToCommit.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
long offset = entry.getValue();
if (offset >= 0) {
- setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset);
+ setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
}
}
}
@Override
- public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) {
+ public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) throws Exception {
for (KafkaTopicPartitionLeader tp : partitions) {
- long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
+ long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
if (offset != OFFSET_NOT_SET) {
LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
@@ -98,30 +101,43 @@ public void seekFetcherToInitialOffsets(List partitio
@Override
public void close() throws IOException {
- zkClient.close();
+ curatorClient.close();
}
// ------------------------------------------------------------------------
// Communication with Zookeeper
// ------------------------------------------------------------------------
- public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
- TopicAndPartition tap = new TopicAndPartition(topic, partition);
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+ public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+ String path = topicDirs.consumerOffsetDir() + "/" + partition;
+ curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+ byte[] data = Long.toString(offset).getBytes();
+ curatorClient.setData().forPath(path, data);
}
- public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
- TopicAndPartition tap = new TopicAndPartition(topic, partition);
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-
- scala.Tuple2