Skip to content

Commit

Permalink
[FLINK-3067] [kafka connector] Use curator for committing offsets to …
Browse files Browse the repository at this point in the history
…ZK from Kafka

This closes #1451
  • Loading branch information
rmetzger authored and StephanEwen committed Dec 30, 2015
1 parent 73e8586 commit 4f8c5e8
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 78 deletions.
22 changes: 7 additions & 15 deletions flink-streaming-connectors/flink-connector-kafka/pom.xml
Expand Up @@ -93,12 +93,7 @@ under the License.
</dependency>

<!-- force using the latest zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
<type>jar</type>
</dependency>


<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -113,6 +108,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-curator-recipes</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
Expand All @@ -122,15 +123,6 @@ under the License.

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
</dependencies>
</dependencyManagement>


<build>
Expand Down
Expand Up @@ -241,7 +241,7 @@ public enum FetcherType {
* Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
*
* <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
* at the beginnign of this class.</p>
* at the beginning of this class.</p>
*
* @param topic
* The Kafka topic to read from.
Expand All @@ -264,7 +264,7 @@ public FlinkKafkaConsumer(List<String> topic, DeserializationSchema<T> deseriali
* Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
*
* <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
* at the beginnign of this class.</p>
* at the beginning of this class.</p>
*
* @param topics
* The Kafka topics to read from.
Expand Down Expand Up @@ -522,7 +522,7 @@ public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long
}

// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
//noinspection unchecked
@SuppressWarnings("unchecked")
HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) lastOffsets.clone();

// the map cannot be asynchronously updated, because only one checkpoint call can happen
Expand Down Expand Up @@ -570,7 +570,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {

//noinspection unchecked
checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);


// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
Expand Down Expand Up @@ -613,8 +612,10 @@ protected static List<KafkaTopicPartitionLeader> assignPartitions(List<KafkaTopi
* Thread to periodically commit the current read offset into Zookeeper.
*/
private static class PeriodicOffsetCommitter<T> extends Thread {

private final long commitInterval;
private final FlinkKafkaConsumer<T> consumer;

private volatile boolean running = true;

public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer<T> consumer) {
Expand All @@ -625,22 +626,20 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer<T> consum
@Override
public void run() {
try {

while (running) {
try {
Thread.sleep(commitInterval);
// ------------ commit current offsets ----------------

// create copy of current offsets
//noinspection unchecked
HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
commitOffsets(currentOffsets, this.consumer);
} catch (InterruptedException e) {
if (running) {
// throw unexpected interruption
throw e;
}
// looks like the thread is being closed. Leave loop
break;
}
}
} catch (Throwable t) {
Expand Down
Expand Up @@ -18,20 +18,18 @@

package org.apache.flink.streaming.connectors.kafka.internals;

import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;

import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.zookeeper.data.Stat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Option;

import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -42,16 +40,14 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);

private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;


private final ZkClient zkClient;


private final String groupId;


private final CuratorFramework curatorClient;


public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);

if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
Expand All @@ -61,30 +57,37 @@ public ZookeeperOffsetHandler(Properties props) {
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}

// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));

zkClient = new ZkClient(zkConnect,
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
new ZooKeeperStringSerializer());
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));

RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}


@Override
public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) {
public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
long offset = entry.getValue();

if (offset >= 0) {
setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset);
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
}
}
}

@Override
public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) {
public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception {
for (KafkaTopicPartitionLeader tp : partitions) {
long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());

if (offset != OFFSET_NOT_SET) {
LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
Expand All @@ -98,30 +101,43 @@ public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitio

@Override
public void close() throws IOException {
zkClient.close();
curatorClient.close();
}

// ------------------------------------------------------------------------
// Communication with Zookeeper
// ------------------------------------------------------------------------

public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes();
curatorClient.setData().forPath(path, data);
}

public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());

scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
topicDirs.consumerOffsetDir() + "/" + tap.partition());

if (data._1().isEmpty()) {
public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return OFFSET_NOT_SET;
} else {
return Long.valueOf(data._1().get());
String asString = new String(data);
if (asString.length() == 0) {
return OFFSET_NOT_SET;
} else {
try {
return Long.parseLong(asString);
} catch (NumberFormatException e) {
throw new Exception(String.format(
"The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string: %s",
groupId, topic, partition, asString));
}
}
}
}
}
Expand Up @@ -31,6 +31,7 @@
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.map.LinkedMap;

import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
Expand Down Expand Up @@ -260,7 +262,7 @@ public void runOffsetInZookeeperValidationTest() throws Exception {

readSequence(env2, standardProps, parallelism, topicName, 100, 0);

ZkClient zkClient = createZookeeperClient();
CuratorFramework zkClient = createZookeeperClient();

long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
Expand Down Expand Up @@ -321,7 +323,7 @@ public void runOffsetAutocommitTest() throws Exception {
readSequence(env2, readProps, parallelism, topicName, 100, 0);

// get the offset
ZkClient zkClient = createZookeeperClient();
CuratorFramework zkClient = createZookeeperClient();

long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
Expand Down Expand Up @@ -796,7 +798,7 @@ public void runInvalidOffsetTest() throws Exception {
writeSequence(env, topic, 20, parallelism);

// set invalid offset:
ZkClient zkClient = createZookeeperClient();
CuratorFramework zkClient = createZookeeperClient();
ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);

// read from topic
Expand Down Expand Up @@ -1016,20 +1018,24 @@ public void runBrokerFailureTest() throws Exception {
topic, parallelism, numElementsPerPartition, true);

// find leader to shut down
ZkClient zkClient = createZookeeperClient();
PartitionMetadata firstPart = null;
do {
if (firstPart != null) {
LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
// not the first try. Sleep a bit
Thread.sleep(150);
}
{
ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());

do {
if (firstPart != null) {
LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
// not the first try. Sleep a bit
Thread.sleep(150);
}

Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
firstPart = partitionMetadata.head();
Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
firstPart = partitionMetadata.head();
}
while (firstPart.errorCode() != 0);
zkClient.close();
}
while (firstPart.errorCode() != 0);
zkClient.close();

final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
final String leaderToShutDownConnection =
Expand Down
Expand Up @@ -27,6 +27,10 @@
import org.I0Itec.zkclient.ZkClient;

import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
Expand Down Expand Up @@ -292,9 +296,11 @@ protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
// Execution utilities
// ------------------------------------------------------------------------

protected ZkClient createZookeeperClient() {
return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
protected CuratorFramework createZookeeperClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
curatorClient.start();
return curatorClient;
}

protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
Expand Down

0 comments on commit 4f8c5e8

Please sign in to comment.