From 648ce763fa5537b118e02b47b5c0c1547340a6c7 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Sat, 12 Dec 2015 19:44:17 +0100 Subject: [PATCH 1/2] [FLINK-3067] Use curator for committing offsets to ZK from Kafka --- .../flink-connector-kafka/pom.xml | 22 ++---- .../connectors/kafka/FlinkKafkaConsumer.java | 1 + .../internals/ZookeeperOffsetHandler.java | 77 +++++++++++-------- .../kafka/KafkaConsumerTestBase.java | 34 ++++---- .../connectors/kafka/KafkaTestBase.java | 12 ++- .../internals/ZookeeperOffsetHandlerTest.java | 17 ++-- 6 files changed, 93 insertions(+), 70 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml index fe763f35c8c0a..7bd9bcbf83e1f 100644 --- a/flink-streaming-connectors/flink-connector-kafka/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka/pom.xml @@ -93,12 +93,7 @@ under the License. - - com.101tec - zkclient - 0.7 - jar - + com.google.guava @@ -113,6 +108,12 @@ under the License. test + + org.apache.flink + flink-shaded-curator-recipes + ${project.version} + + org.apache.flink flink-test-utils @@ -122,15 +123,6 @@ under the License. - - - - com.101tec - zkclient - 0.7 - - - diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index b139e95351b0e..e6eddb4881cea 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -632,6 +632,7 @@ public void run() { // ------------ commit current offsets ---------------- // create copy of current offsets + //noinspection unchecked HashMap currentOffsets = (HashMap) consumer.lastOffsets.clone(); commitOffsets(currentOffsets, this.consumer); } catch (InterruptedException e) { diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index f9b8448a4e547..6f9f192b54f2d 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -18,19 +18,20 @@ package org.apache.flink.streaming.connectors.kafka.internals; -import kafka.common.TopicAndPartition; import kafka.utils.ZKGroupTopicDirs; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import java.io.IOException; import java.util.List; @@ -42,13 +43,12 @@ public class ZookeeperOffsetHandler implements OffsetHandler { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET; - - - private final ZkClient zkClient; - + private final String groupId; - + private final CuratorFramework curatorClient; + + public ZookeeperOffsetHandler(Properties props) { this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG); @@ -61,30 +61,29 @@ public ZookeeperOffsetHandler(Properties props) { if (zkConnect == null) { throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set"); } - - zkClient = new ZkClient(zkConnect, - Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")), - Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")), - new ZooKeeperStringSerializer()); + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); + curatorClient = CuratorFrameworkFactory.newClient(zkConnect, retryPolicy); + curatorClient.start(); } @Override - public void commit(Map offsetsToCommit) { + public void commit(Map offsetsToCommit) throws Exception { for (Map.Entry entry : offsetsToCommit.entrySet()) { KafkaTopicPartition tp = entry.getKey(); long offset = entry.getValue(); if (offset >= 0) { - setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset); + setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset); } } } @Override - public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) { + public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) throws Exception { for (KafkaTopicPartitionLeader tp : partitions) { - long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition()); + long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition()); if (offset != OFFSET_NOT_SET) { LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.", @@ -98,30 +97,42 @@ public void seekFetcherToInitialOffsets(List partitio @Override public void close() throws IOException { - zkClient.close(); + curatorClient.close(); } // ------------------------------------------------------------------------ // Communication with Zookeeper // ------------------------------------------------------------------------ - public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) { - TopicAndPartition tap = new TopicAndPartition(topic, partition); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic()); - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset)); + public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + ensureExists(curatorClient, path); + byte[] data = Long.toString(offset).getBytes(); + curatorClient.setData().forPath(path, data); } - public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) { - TopicAndPartition tap = new TopicAndPartition(topic, partition); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic()); - - scala.Tuple2, Stat> data = ZkUtils.readDataMaybeNull(zkClient, - topicDirs.consumerOffsetDir() + "/" + tap.partition()); - - if (data._1().isEmpty()) { + public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + ensureExists(curatorClient, path); + byte[] data = curatorClient.getData().forPath(path); + if(data == null) { return OFFSET_NOT_SET; } else { - return Long.valueOf(data._1().get()); + String asString = new String(data); + if(asString.length() == 0) { + return OFFSET_NOT_SET; + } else { + return Long.valueOf(asString); + } + } + } + + private static void ensureExists(CuratorFramework curatorClient, String path) throws Exception { + Stat existsCheck = curatorClient.checkExists().forPath(path); + if(existsCheck == null) { + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), path); } } } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 4d8b7c3f3b44a..4f7138452c7ba 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.commons.collections.map.LinkedMap; +import org.apache.curator.framework.CuratorFramework; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -58,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; @@ -260,7 +262,7 @@ public void runOffsetInZookeeperValidationTest() throws Exception { readSequence(env2, standardProps, parallelism, topicName, 100, 0); - ZkClient zkClient = createZookeeperClient(); + CuratorFramework zkClient = createZookeeperClient(); long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0); long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1); @@ -321,7 +323,7 @@ public void runOffsetAutocommitTest() throws Exception { readSequence(env2, readProps, parallelism, topicName, 100, 0); // get the offset - ZkClient zkClient = createZookeeperClient(); + CuratorFramework zkClient = createZookeeperClient(); long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0); long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1); @@ -796,7 +798,7 @@ public void runInvalidOffsetTest() throws Exception { writeSequence(env, topic, 20, parallelism); // set invalid offset: - ZkClient zkClient = createZookeeperClient(); + CuratorFramework zkClient = createZookeeperClient(); ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234); // read from topic @@ -1016,20 +1018,24 @@ public void runBrokerFailureTest() throws Exception { topic, parallelism, numElementsPerPartition, true); // find leader to shut down - ZkClient zkClient = createZookeeperClient(); PartitionMetadata firstPart = null; - do { - if (firstPart != null) { - LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); - // not the first try. Sleep a bit - Thread.sleep(150); - } + { + ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), + standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } - Seq partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); - firstPart = partitionMetadata.head(); + Seq partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } + while (firstPart.errorCode() != 0); + zkClient.close(); } - while (firstPart.errorCode() != 0); - zkClient.close(); final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get(); final String leaderToShutDownConnection = diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 3550bd9d6ca67..779cce96f7e1e 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -27,6 +27,10 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; @@ -292,9 +296,11 @@ protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder, // Execution utilities // ------------------------------------------------------------------------ - protected ZkClient createZookeeperClient() { - return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), - standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + protected CuratorFramework createZookeeperClient() { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); + CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy); + curatorClient.start(); + return curatorClient; } protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception { diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java index 27ad2e82ec3b6..8d16da0a46c8a 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java @@ -21,6 +21,7 @@ import kafka.admin.AdminUtils; import org.I0Itec.zkclient.ZkClient; +import org.apache.curator.framework.CuratorFramework; import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; @@ -41,14 +42,20 @@ public void runOffsetManipulationinZooKeeperTest() { final long offset = (long) (Math.random() * Long.MAX_VALUE); - ZkClient zkClient = createZookeeperClient(); - AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties()); + CuratorFramework curatorFramework = createZookeeperClient(); + + { + ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), + standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties()); + zkClient.close(); + } - ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); - long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0); + long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); - zkClient.close(); + curatorFramework.close(); assertEquals(offset, fetchedOffset); } From 9b5d8fe5590192735cedf3d713bc12de2ae5ff68 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Mon, 14 Dec 2015 19:47:21 +0100 Subject: [PATCH 2/2] PR review --- .../connectors/kafka/FlinkKafkaConsumer.java | 2 +- .../internals/ZookeeperOffsetHandler.java | 23 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index e6eddb4881cea..05f359ed1185a 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -522,7 +522,7 @@ public HashMap snapshotState(long checkpointId, long } // the use of clone() is okay here is okay, we just need a new map, the keys are not changed - //noinspection unchecked + @SuppressWarnings("unchecked") HashMap currentOffsets = (HashMap) lastOffsets.clone(); // the map cannot be asynchronously updated, because only one checkpoint call can happen diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index 6f9f192b54f2d..cd2eebd8643bc 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -62,8 +62,14 @@ public ZookeeperOffsetHandler(Properties props) { throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set"); } - RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); - curatorClient = CuratorFrameworkFactory.newClient(zkConnect, retryPolicy); + // we use Curator's default timeouts + int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000")); + int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000")); + // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs) + int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); + int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries); + curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); curatorClient.start(); } @@ -107,7 +113,7 @@ public void close() throws IOException { public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception { ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); String path = topicDirs.consumerOffsetDir() + "/" + partition; - ensureExists(curatorClient, path); + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); byte[] data = Long.toString(offset).getBytes(); curatorClient.setData().forPath(path, data); } @@ -115,9 +121,9 @@ public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String g public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); String path = topicDirs.consumerOffsetDir() + "/" + partition; - ensureExists(curatorClient, path); + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); byte[] data = curatorClient.getData().forPath(path); - if(data == null) { + if (data == null) { return OFFSET_NOT_SET; } else { String asString = new String(data); @@ -128,11 +134,4 @@ public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String } } } - - private static void ensureExists(CuratorFramework curatorClient, String path) throws Exception { - Stat existsCheck = curatorClient.checkExists().forPath(path); - if(existsCheck == null) { - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), path); - } - } }